IGNITE-535 (WIP) MQTT Streamer.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53683e20 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53683e20 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53683e20 Branch: refs/heads/ignite-843 Commit: 53683e20d304dfd96d544f286e8460d3829598d8 Parents: 6b53f1b Author: Raul Kripalani <ra...@apache.org> Authored: Mon Sep 21 16:23:28 2015 +0100 Committer: Raul Kripalani <ra...@apache.org> Committed: Mon Sep 21 16:23:28 2015 +0100 ---------------------------------------------------------------------- modules/mqtt/pom.xml | 39 +- .../apache/ignite/stream/mqtt/MqttStreamer.java | 336 +++++++++++--- .../stream/mqtt/IgniteMqttStreamerTest.java | 435 +++++++++++++++++++ .../ignite/stream/mqtt/TestTupleExtractors.java | 28 -- 4 files changed, 741 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/pom.xml ---------------------------------------------------------------------- diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml index b108180..4b0b46c 100644 --- a/modules/mqtt/pom.xml +++ b/modules/mqtt/pom.xml @@ -37,7 +37,8 @@ <properties> <paho.version>1.0.2</paho.version> - <mosquette.version>0.7</mosquette.version> + <activemq.version>5.11.1</activemq.version> + <guava-retryier.version>2.0.0</guava-retryier.version> </properties> <dependencies> @@ -54,9 +55,29 @@ </dependency> <dependency> - <groupId>org.eclipse.moquette</groupId> - <artifactId>moquette-broker</artifactId> - <version>${mosquette.version}</version> + <groupId>com.github.rholder</groupId> + <artifactId>guava-retrying</artifactId> + <version>${guava-retryier.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-broker</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-kahadb-store</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-mqtt</artifactId> + <version>${activemq.version}</version> <scope>test</scope> </dependency> @@ -86,16 +107,6 @@ <!-- Repository for Mosquette (embedded MQTT broker for tests) and for Eclipse Paho (MQTT client) --> <repositories> <repository> - <id>bintray</id> - <url>http://dl.bintray.com/andsel/maven/</url> - <releases> - <enabled>true</enabled> - </releases> - <snapshots> - <enabled>false</enabled> - </snapshots> - </repository> - <repository> <id>Eclipse Paho Repo</id> <url>https://repo.eclipse.org/content/repositories/paho-releases/</url> <releases> http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java index 00a89ab..b86d385 100644 --- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java +++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java @@ -20,6 +20,10 @@ package org.apache.ignite.stream.mqtt; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; @@ -29,12 +33,19 @@ import org.apache.ignite.stream.StreamAdapter; import org.apache.ignite.stream.StreamMultipleTupleExtractor; import org.apache.ignite.stream.StreamSingleTupleExtractor; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.StopStrategy; +import com.github.rholder.retry.WaitStrategies; +import com.github.rholder.retry.WaitStrategy; +import com.google.common.base.Joiner; +import com.google.common.base.Predicate; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClientPersistence; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; /** @@ -90,8 +101,20 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme private Integer disconnectForciblyTimeout; + private WaitStrategy retryWaitStrategy = WaitStrategies.fibonacciWait(); + + private StopStrategy retryStopStrategy = StopStrategies.neverStop(); + + private MqttConnectionRetrier connectionRetrier; + private volatile boolean stopped = true; + private volatile boolean connected; + + private String cachedLogPrefix; + + private boolean blockUntilConnected; + /** * Starts streamer. * @@ -109,18 +132,21 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme // parameter validations A.notNull(getStreamer(), "streamer"); A.notNull(getIgnite(), "ignite"); - A.ensure(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null, "tuple extractor missing"); + A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null), "tuple extractor missing"); A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " + "both single and multiple tuple extractor"); A.notNullOrEmpty(brokerUrl, "broker URL"); A.notNullOrEmpty(clientId, "client ID"); - // if we have both a single topic and a list of topics, fail - if (topic != null && topic.length() > 0 && !topics.isEmpty()) + // if we have both a single topic and a list of topics (but the list of topic is not of + // size 1 and == topic, as this would be a case of re-initialization), fail + if (topic != null && topic.length() > 0 && !topics.isEmpty() && + topics.size() != 1 && !topics.get(0).equals(topic)) throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time"); - // if we have both a single QoS and list, fail - if (qualityOfService != null && !qualitiesOfService.isEmpty()) { + // same as above but for QoS + if (qualityOfService != null && !qualitiesOfService.isEmpty() && qualitiesOfService.size() != 1 && + !qualitiesOfService.get(0).equals(qualityOfService)) { throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time"); } @@ -131,12 +157,22 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme } // if we have multiple topics - if (topics != null && !topics.isEmpty()) { - for (String t : topics) { + if (!topics.isEmpty()) { + for (String t : topics) A.notNullOrEmpty(t, "topic in list of topics"); - } + A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), "qualities of " + "service must be either empty or have the same size as topics list"); + + cachedLogPrefix = "[" + Joiner.on(",").join(topics) + "]"; + } + else { // just the single topic + topics.add(topic); + + if (qualityOfService != null) + qualitiesOfService.add(qualityOfService); + + cachedLogPrefix = "[" + topic + "]"; } // create logger @@ -148,10 +184,28 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme else client = new MqttClient(brokerUrl, clientId, persistence); - connectAndSubscribe(); + // set this as a callback + client.setCallback(this); + // set stopped to false, as the connection will start async stopped = false; + // build retrier + Retryer<Boolean> retrier = RetryerBuilder.<Boolean>newBuilder() + .retryIfResult(new Predicate<Boolean>() { + @Override public boolean apply(Boolean connected) { + return !connected; + } + }) + .retryIfException().retryIfRuntimeException() + .withWaitStrategy(retryWaitStrategy) + .withStopStrategy(retryStopStrategy) + .build(); + + // create the connection retrier + connectionRetrier = new MqttConnectionRetrier(retrier); + connectionRetrier.connect(); + } catch (Throwable t) { throw new IgniteException("Exception while initializing MqttStreamer", t); @@ -159,34 +213,6 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme } - private void connectAndSubscribe() throws MqttException { - // connect - if (connectOptions != null) - client.connect(); - else - client.connect(connectOptions); - - // subscribe to multiple topics - if (!topics.isEmpty()) { - if (qualitiesOfService.isEmpty()) { - client.subscribe(topics.toArray(new String[0])); - } else { - int[] qoses = new int[qualitiesOfService.size()]; - for (int i = 0; i < qualitiesOfService.size(); i++) - qoses[i] = qualitiesOfService.get(i); - - client.subscribe(topics.toArray(new String[0]), qoses); - } - } else { - // subscribe to a single topic - if (qualityOfService == null) { - client.subscribe(topic); - } else { - client.subscribe(topic, qualityOfService); - } - } - } - /** * Stops streamer. */ @@ -194,50 +220,250 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme if (stopped) throw new IgniteException("Attempted to stop an already stopped MQTT Streamer"); + // stop the retrier + connectionRetrier.stop(); + try { if (disconnectForcibly) { - if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null) { + if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null) client.disconnectForcibly(); - } else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null) { + + else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null) client.disconnectForcibly(disconnectForciblyTimeout); - } else { + + else client.disconnectForcibly(disconnectQuiesceTimeout, disconnectForciblyTimeout); - } + } else { - if (disconnectQuiesceTimeout == null) { + if (disconnectQuiesceTimeout == null) client.disconnect(); - } else { + + else client.disconnect(disconnectQuiesceTimeout); - } + } + + client.close(); + connected = false; + stopped = true; + } catch (Throwable t) { throw new IgniteException("Exception while stopping MqttStreamer", t); } } + // ------------------------------- + // MQTT Client callback methods + // ------------------------------- + @Override public void connectionLost(Throwable throwable) { - log.warning(String.format("MQTT Connection to server %s was lost due to", brokerUrl), throwable); - // TODO: handle reconnect attempts with an optional backoff mechanism (linear, exponential, finonacci) - try { - connectAndSubscribe(); - } - catch (MqttException e) { - e.printStackTrace(); - } + connected = false; + + // if we have been stopped, we do not try to establish the connection again + if (stopped) + return; + + log.warning(String.format("MQTT Connection to server %s was lost.", brokerUrl), throwable); + connectionRetrier.connect(); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { if (getMultipleTupleExtractor() != null) { Map<K, V> entries = getMultipleTupleExtractor().extract(message); + if (log.isTraceEnabled()) { + log.trace("Adding cache entries: " + entries); + } getStreamer().addData(entries); - } else { + } + else { Map.Entry<K, V> entry = getSingleTupleExtractor().extract(message); + if (log.isTraceEnabled()) { + log.trace("Adding cache entry: " + entry); + } getStreamer().addData(entry); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { - // ignore, we don't send messages + // ignore, as we don't send messages + } + + // ------------------------------- + // Getters and setters + // ------------------------------- + + public String getBrokerUrl() { + return brokerUrl; + } + + public void setBrokerUrl(String brokerUrl) { + this.brokerUrl = brokerUrl; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public Integer getQualityOfService() { + return qualityOfService; + } + + public void setQualityOfService(Integer qualityOfService) { + this.qualityOfService = qualityOfService; + } + + public List<String> getTopics() { + return topics; + } + + public void setTopics(List<String> topics) { + this.topics = topics; + } + + public List<Integer> getQualitiesOfService() { + return qualitiesOfService; + } + + public void setQualitiesOfService(List<Integer> qualitiesOfService) { + this.qualitiesOfService = qualitiesOfService; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public MqttClientPersistence getPersistence() { + return persistence; + } + + public void setPersistence(MqttClientPersistence persistence) { + this.persistence = persistence; + } + + public MqttConnectOptions getConnectOptions() { + return connectOptions; + } + + public void setConnectOptions(MqttConnectOptions connectOptions) { + this.connectOptions = connectOptions; + } + + public boolean isDisconnectForcibly() { + return disconnectForcibly; + } + + public void setDisconnectForcibly(boolean disconnectForcibly) { + this.disconnectForcibly = disconnectForcibly; + } + + public Integer getDisconnectQuiesceTimeout() { + return disconnectQuiesceTimeout; + } + + public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) { + this.disconnectQuiesceTimeout = disconnectQuiesceTimeout; + } + + public Integer getDisconnectForciblyTimeout() { + return disconnectForciblyTimeout; + } + + public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) { + this.disconnectForciblyTimeout = disconnectForciblyTimeout; } + + public WaitStrategy getRetryWaitStrategy() { + return retryWaitStrategy; + } + + public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) { + this.retryWaitStrategy = retryWaitStrategy; + } + + public StopStrategy getRetryStopStrategy() { + return retryStopStrategy; + } + + public void setRetryStopStrategy(StopStrategy retryStopStrategy) { + this.retryStopStrategy = retryStopStrategy; + } + + public boolean isBlockUntilConnected() { + return blockUntilConnected; + } + + public void setBlockUntilConnected(boolean blockUntilConnected) { + this.blockUntilConnected = blockUntilConnected; + } + + private class MqttConnectionRetrier { + + private final Retryer<Boolean> retrier; + private ExecutorService executor = Executors.newSingleThreadExecutor(); + + public MqttConnectionRetrier(Retryer<Boolean> retrier) { + this.retrier = retrier; + } + + public void connect() { + Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + // if we're already connected, return immediately + if (connected) + return true; + + if (stopped) + return false; + + // connect to broker + if (connectOptions == null) + client.connect(); + else + client.connect(connectOptions); + + // always use the multiple topics variant of the mqtt client; even if the user specified a single + // topic and/or QoS, the initialization code would have placed it inside the 1..n structures + if (qualitiesOfService.isEmpty()) + client.subscribe(topics.toArray(new String[0])); + + else { + int[] qoses = new int[qualitiesOfService.size()]; + for (int i = 0; i < qualitiesOfService.size(); i++) + qoses[i] = qualitiesOfService.get(i); + + client.subscribe(topics.toArray(new String[0]), qoses); + } + + connected = true; + return connected; + } + }); + + Future<Boolean> result = executor.submit(callable); + + if (blockUntilConnected) { + try { + result.get(); + } + catch (Throwable e) { + throw new RuntimeException(e); + } + } + } + + public void stop() { + executor.shutdownNow(); + } + + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java index 59730fa..012486a 100644 --- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java +++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java @@ -17,11 +17,47 @@ package org.apache.ignite.stream.mqtt; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.events.CacheEvent; +import org.apache.ignite.internal.util.lang.GridMapEntry; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.stream.StreamMultipleTupleExtractor; +import org.apache.ignite.stream.StreamSingleTupleExtractor; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; +import com.google.common.base.Splitter; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.After; import org.junit.Before; +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; + /** * Test for {@link MqttStreamer}. * @@ -29,6 +65,24 @@ import org.junit.Before; */ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { + private static final Map<Integer, String> TEST_DATA = new HashMap<>(); + private static final String SINGLE_TOPIC_NAME = "abc"; + private static final List<String> MULTIPLE_TOPIC_NAMES = Arrays.asList("def", "ghi", "jkl", "mno"); + + private BrokerService broker; + private MqttClient client; + private String brokerUrl; + private int port; + private MqttStreamer<Integer, String> streamer; + private UUID remoteListener; + + static { + for (int i = 0; i < 100; i++) + TEST_DATA.put(i, "v" + i); + } + + private IgniteDataStreamer<Integer, String> dataStreamer; + /** Constructor. */ public IgniteMqttStreamerTest() { super(true); @@ -38,13 +92,394 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { public void beforeTest() throws Exception { grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration()); + // find an available local port + try (ServerSocket ss = new ServerSocket(0)) { + port = ss.getLocalPort(); + } + + // create the broker + broker = new BrokerService(); + broker.deleteAllMessages(); + broker.setPersistent(false); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setQueuePrefetch(1); + broker.setDestinationPolicy(policyMap); + broker.getDestinationPolicy().setDefaultEntry(policy); + + // add the MQTT transport connector to the broker + broker.addConnector("mqtt://localhost:" + port); + broker.setStartAsync(false); + broker.start(true); + + // create the broker URL + brokerUrl = "tcp://localhost:" + port; + + // create the client and connect + client = new MqttClient(brokerUrl, UUID.randomUUID().toString(), new MemoryPersistence()); + client.connect(); + + // create mqtt streamer + dataStreamer = grid().dataStreamer(null); + streamer = createMqttStreamer(dataStreamer); } @After public void afterTest() throws Exception { + try { + streamer.stop(); + } + catch (Exception e) { + // ignore if already stopped + } + + dataStreamer.close(); + grid().cache(null).clear(); + broker.stop(); + broker.deleteAllMessages(); + + } + + public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception { + // configure streamer + streamer.setSingleTupleExtractor(singleTupleExtractor()); + streamer.setTopic(SINGLE_TOPIC_NAME); + + // subscribe to cache PUT events + CountDownLatch latch = subscribeToPutEvents(50); + + // action time + streamer.start(); + + // send messages + sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false); + + // assertions + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(50); + } + + public void testMultipleTopics_NoQoS_OneEntryPerMessage() throws Exception { + // configure streamer + streamer.setSingleTupleExtractor(singleTupleExtractor()); + streamer.setTopics(MULTIPLE_TOPIC_NAMES); + + // subscribe to cache PUT events + CountDownLatch latch = subscribeToPutEvents(50); + + // action time + streamer.start(); + + // send messages + sendMessages(MULTIPLE_TOPIC_NAMES, 0, 50, false); + + // assertions + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(50); + + assertTrue(broker.getBroker().getDestinationMap().size() >= 4); + assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("def"))); + assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi"))); + } + + public void testSingleTopic_NoQoS_MultipleEntriesOneMessage() throws Exception { + // configure streamer + streamer.setMultipleTupleExtractor(multipleTupleExtractor()); + streamer.setTopic(SINGLE_TOPIC_NAME); + + // subscribe to cache PUT events + CountDownLatch latch = subscribeToPutEvents(50); + + // action time + streamer.start(); + + // send messages + sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, true); + + // assertions + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(50); + } + + public void testMultipleTopics_NoQoS_MultipleEntriesOneMessage() throws Exception { + // configure streamer + streamer.setMultipleTupleExtractor(multipleTupleExtractor()); + streamer.setTopics(MULTIPLE_TOPIC_NAMES); + + // subscribe to cache PUT events + CountDownLatch latch = subscribeToPutEvents(50); + + // action time + streamer.start(); + + // send messages + sendMessages(MULTIPLE_TOPIC_NAMES, 0, 50, true); + + // assertions + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(50); + + assertTrue(broker.getBroker().getDestinationMap().size() >= 4); + assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("def"))); + assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi"))); + } + + public void testSingleTopic_NoQoS_ConnectOptions_Durable() throws Exception { + // configure streamer + streamer.setSingleTupleExtractor(singleTupleExtractor()); + streamer.setTopic(SINGLE_TOPIC_NAME); + + MqttConnectOptions connOptions = new MqttConnectOptions(); + connOptions.setCleanSession(false); + streamer.setConnectOptions(connOptions); + + // subscribe to cache PUT events + CountDownLatch latch = subscribeToPutEvents(50); + + // action time + streamer.start(); + + // send messages + sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false); + + // assertions + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(50); + + // explicitly stop the streamer + streamer.stop(); + + // send messages while stopped + sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false); + + latch = subscribeToPutEvents(50); + + // start the streamer again + streamer.start(); + + // assertions - make sure that messages sent during disconnection were also received + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(100); + } + + public void testSingleTopic_NoQoS_Reconnect() throws Exception { + // configure streamer + streamer.setSingleTupleExtractor(singleTupleExtractor()); + streamer.setRetryWaitStrategy(WaitStrategies.noWait()); + streamer.setRetryStopStrategy(StopStrategies.neverStop()); + streamer.setTopic(SINGLE_TOPIC_NAME); + + // subscribe to cache PUT events + CountDownLatch latch = subscribeToPutEvents(50); + + // action time + streamer.start(); + + // send messages + sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false); + + // assertions + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(50); + + // now shutdown the broker, wait 2 seconds and start it again + broker.stop(); + broker.start(true); + broker.waitUntilStarted(); + Thread.sleep(2000); + client.connect(); + + // let's ensure we have 2 connections: Ignite and our test + assertEquals(2, broker.getTransportConnectorByScheme("mqtt").getConnections().size()); + + // subscribe to cache PUT events again + latch = subscribeToPutEvents(50); + + // send messages + sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false); + + // assertions + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(100); + } + + public void testSingleTopic_NoQoS_RetryOnce() throws Exception { + // configure streamer + streamer.setSingleTupleExtractor(singleTupleExtractor()); + streamer.setRetryWaitStrategy(WaitStrategies.noWait()); + streamer.setRetryStopStrategy(StopStrategies.stopAfterAttempt(1)); + streamer.setTopic(SINGLE_TOPIC_NAME); + + // subscribe to cache PUT events + CountDownLatch latch = subscribeToPutEvents(50); + + // action time + streamer.start(); + + // send messages + sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false); + + // assertions + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(50); + + // now shutdown the broker, wait 2 seconds and start it again + broker.stop(); + broker.start(true); + broker.waitUntilStarted(); + client.connect(); + + // lets send messages and ensure they are not received, because our retrier desisted + sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false); + Thread.sleep(3000); + assertNull(grid().cache(null).get(50)); + + } + + public void testMultipleTopics_MultipleQoS_OneEntryPerMessage() throws Exception { + // configure streamer + streamer.setSingleTupleExtractor(singleTupleExtractor()); + streamer.setTopics(MULTIPLE_TOPIC_NAMES); + streamer.setQualitiesOfService(Arrays.asList(1, 1, 1, 1)); + + // subscribe to cache PUT events + CountDownLatch latch = subscribeToPutEvents(50); + + // action time + streamer.start(); + + // send messages + sendMessages(MULTIPLE_TOPIC_NAMES, 0, 50, false); + + // assertions + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(50); + + assertTrue(broker.getBroker().getDestinationMap().size() >= 4); + assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("def"))); + assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi"))); + } + + public void testMultipleTopics_MultipleQoS_Mismatch() throws Exception { + // configure streamer + streamer.setSingleTupleExtractor(singleTupleExtractor()); + streamer.setTopics(MULTIPLE_TOPIC_NAMES); + streamer.setQualitiesOfService(Arrays.asList(1, 1, 1)); + + try { + streamer.start(); + } + catch (Exception e) { + return; + } + fail("Expected an exception reporting invalid parameters"); + + } + + private MqttStreamer<Integer, String> createMqttStreamer(IgniteDataStreamer<Integer, String> dataStreamer) { + MqttStreamer<Integer, String> streamer = new MqttStreamer<>(); + streamer.setIgnite(grid()); + streamer.setStreamer(dataStreamer); + streamer.setBrokerUrl(brokerUrl); + streamer.setClientId(UUID.randomUUID().toString()); + streamer.setBlockUntilConnected(true); + + dataStreamer.allowOverwrite(true); + dataStreamer.autoFlushFrequency(1); + + return streamer; + } + + public void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException { + if (singleMessage) { + final List<StringBuilder> sbs = new ArrayList<>(topics.size()); + // initialize String Builders for each topic + F.forEach(topics, new IgniteInClosure<String>() { + @Override public void apply(String s) { + sbs.add(new StringBuilder()); + } + }); + // fill String Builders for each topic + F.forEach(F.range(fromIdx, fromIdx + count), new IgniteInClosure<Integer>() { + @Override public void apply(Integer integer) { + sbs.get(integer % topics.size()).append(integer.toString() + "," + TEST_DATA.get(integer) + "\n"); + } + }); + // send each buffer out + for (int i = 0; i < topics.size(); i++) { + MqttMessage msg = new MqttMessage(sbs.get(i).toString().getBytes()); + client.publish(topics.get(i % topics.size()), msg); + } + } + else { + for (int i = fromIdx; i < fromIdx + count; i++) { + byte[] payload = (i + "," + TEST_DATA.get(i)).getBytes(); + MqttMessage msg = new MqttMessage(payload); + client.publish(topics.get(i % topics.size()), msg); + } + } + } + + private CountDownLatch subscribeToPutEvents(int expect) { + Ignite ignite = grid(); + + // Listen to cache PUT events and expect as many as messages as test data items + final CountDownLatch latch = new CountDownLatch(expect); + @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback = new IgniteBiPredicate<UUID, CacheEvent>() { + @Override public boolean apply(UUID uuid, CacheEvent evt) { + latch.countDown(); + return true; + } + }; + + remoteListener = ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(callback, null, EVT_CACHE_OBJECT_PUT); + return latch; + } + + private void assertCacheEntriesLoaded(int count) { + // get the cache and check that the entries are present + IgniteCache<Integer, String> cache = grid().cache(null); + + // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache + for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count)) { + assertEquals(TEST_DATA.get(key), cache.get(key)); + } + + // assert that the cache exactly the specified amount of elements + assertEquals(count, cache.size(CachePeekMode.ALL)); + + // remove the event listener + grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener); + } + + public static StreamSingleTupleExtractor<MqttMessage, Integer, String> singleTupleExtractor() { + return new StreamSingleTupleExtractor<MqttMessage, Integer, String>() { + @Override public Map.Entry<Integer, String> extract(MqttMessage msg) { + List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload())); + return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1)); + } + }; + } + public static StreamMultipleTupleExtractor<MqttMessage, Integer, String> multipleTupleExtractor() { + return new StreamMultipleTupleExtractor<MqttMessage, Integer, String>() { + @Override public Map<Integer, String> extract(MqttMessage msg) { + final Map<String, String> map = Splitter.on("\n") + .omitEmptyStrings() + .withKeyValueSeparator(",") + .split(new String(msg.getPayload())); + final Map<Integer, String> answer = new HashMap<>(); + F.forEach(map.keySet(), new IgniteInClosure<String>() { + @Override public void apply(String s) { + answer.put(Integer.parseInt(s), map.get(s)); + } + }); + return answer; + } + }; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java deleted file mode 100644 index e2ed0f0..0000000 --- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.stream.mqtt; - -/** - * Test transformers for MqttStreamer tests. - * - * @author Raul Kripalani - */ -public class TestTupleExtractors { - - -} \ No newline at end of file