This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch STORM-3988
in repository https://gitbox.apache.org/repos/asf/storm.git

commit 8a2070dfb15ec70aff4c9c21137ec80858eff69b
Author: Richard Zowalla <[email protected]>
AuthorDate: Thu Oct 19 08:58:53 2023 +0200

    STORM-3988 - Remove "storm-mqtt"
---
 examples/storm-mqtt-examples/pom.xml               | 145 --------
 .../storm-mqtt-examples/src/main/flux/sample.yaml  |  62 ----
 .../src/main/flux/ssl-sample.yaml                  |  78 -----
 .../storm/mqtt/examples/CustomMessageMapper.java   |  79 -----
 .../storm/mqtt/examples/MqttBrokerPublisher.java   | 137 --------
 .../apache/storm/mqtt/examples/package-info.java   |  20 --
 .../src/main/resources/log4j2.xml                  |  32 --
 external/storm-mqtt/README.md                      | 375 ---------------------
 external/storm-mqtt/pom.xml                        | 148 --------
 .../java/org/apache/storm/mqtt/MqttLogger.java     |  30 --
 .../java/org/apache/storm/mqtt/MqttMessage.java    |  37 --
 .../org/apache/storm/mqtt/MqttMessageMapper.java   |  38 ---
 .../org/apache/storm/mqtt/MqttTupleMapper.java     |  31 --
 .../java/org/apache/storm/mqtt/bolt/MqttBolt.java  |  97 ------
 .../org/apache/storm/mqtt/common/MqttOptions.java  | 301 -----------------
 .../apache/storm/mqtt/common/MqttPublisher.java    |  59 ----
 .../org/apache/storm/mqtt/common/MqttUtils.java    |  82 -----
 .../org/apache/storm/mqtt/common/SslUtils.java     |  57 ----
 .../storm/mqtt/mappers/ByteArrayMessageMapper.java |  31 --
 .../storm/mqtt/mappers/StringMessageMapper.java    |  34 --
 .../apache/storm/mqtt/spout/AckableMessage.java    |  70 ----
 .../org/apache/storm/mqtt/spout/MqttSpout.java     | 270 ---------------
 .../storm/mqtt/ssl/DefaultKeyStoreLoader.java      |  92 -----
 .../org/apache/storm/mqtt/ssl/KeyStoreLoader.java  |  34 --
 .../storm/mqtt/trident/MqttPublishFunction.java    |  78 -----
 .../storm/mqtt/StormMqttIntegrationTest.java       | 145 --------
 pom.xml                                            |   2 -
 27 files changed, 2564 deletions(-)

diff --git a/examples/storm-mqtt-examples/pom.xml 
b/examples/storm-mqtt-examples/pom.xml
deleted file mode 100644
index 64a5790da..000000000
--- a/examples/storm-mqtt-examples/pom.xml
+++ /dev/null
@@ -1,145 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-
-  <artifactId>storm-mqtt-examples</artifactId>
-  <packaging>jar</packaging>
-
-  <name>storm-mqtt-examples</name>
-
-  <parent>
-    <artifactId>storm</artifactId>
-    <groupId>org.apache.storm</groupId>
-    <version>2.6.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-  
-  <dependencies>
-   <dependency>
-      <groupId>org.apache.storm</groupId>
-      <artifactId>storm-client</artifactId>
-      <version>${project.version}</version>
-      <scope>${provided.scope}</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.storm</groupId>
-      <artifactId>storm-mqtt</artifactId>
-      <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <!-- required to bypass failure in mvn deploy -->
-          <groupId>org.apache.activemq.protobuf</groupId>
-          <artifactId>activemq-protobuf</artifactId>
-        </exclusion>
-      </exclusions>
-
-    </dependency>
-    <dependency>
-      <groupId>org.apache.storm</groupId>
-      <artifactId>flux-core</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.fusesource.mqtt-client</groupId>
-      <artifactId>mqtt-client</artifactId>
-      <version>1.16</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-broker</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-mqtt</artifactId>
-      <exclusions>
-        <exclusion>
-          <!-- required to bypass failure in mvn deploy -->
-          <groupId>org.apache.activemq.protobuf</groupId>
-          <artifactId>activemq-protobuf</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-kahadb-store</artifactId>
-      <exclusions>
-        <exclusion>
-          <!-- required to bypass failure in mvn deploy -->
-          <groupId>org.apache.activemq.protobuf</groupId>
-          <artifactId>activemq-protobuf</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-  </dependencies>
-  <build>
-    <plugins>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <configuration>
-          <createDependencyReducedPom>true</createDependencyReducedPom>
-          <filters>
-            <filter>
-              <artifact>*:*</artifact>
-              <excludes>
-                <exclude>META-INF/*.SF</exclude>
-                <exclude>META-INF/*.sf</exclude>
-                <exclude>META-INF/*.DSA</exclude>
-                <exclude>META-INF/*.dsa</exclude>
-                <exclude>META-INF/*.RSA</exclude>
-                <exclude>META-INF/*.rsa</exclude>
-                <exclude>META-INF/*.EC</exclude>
-                <exclude>META-INF/*.ec</exclude>
-                <exclude>META-INF/MSFTSIG.SF</exclude>
-                <exclude>META-INF/MSFTSIG.RSA</exclude>
-              </excludes>
-            </filter>
-        </filters>
-        </configuration>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>shade</goal>
-            </goals>
-            <configuration>
-              <transformers>
-                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
-                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                  <mainClass>org.apache.storm.flux.Flux</mainClass>
-                </transformer>
-              </transformers>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-        <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-checkstyle-plugin</artifactId>
-            <!--Note - the version would be inherited-->
-        </plugin>
-        <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-pmd-plugin</artifactId>
-        </plugin>
-    </plugins>
-  </build>
-
-</project>
diff --git a/examples/storm-mqtt-examples/src/main/flux/sample.yaml 
b/examples/storm-mqtt-examples/src/main/flux/sample.yaml
deleted file mode 100644
index c2902dcff..000000000
--- a/examples/storm-mqtt-examples/src/main/flux/sample.yaml
+++ /dev/null
@@ -1,62 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
----
-
-# topology definition
-# name to be used when submitting
-name: "mqtt-topology"
-
-components:
-   ########## MQTT Spout Config ############
-  - id: "mqtt-type"
-    className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
-
-  - id: "mqtt-options"
-    className: "org.apache.storm.mqtt.common.MqttOptions"
-    properties:
-      - name: "url"
-        value: "tcp://localhost:1883"
-      - name: "topics"
-        value:
-          - "/users/tgoetz/#"
-
-# topology configuration
-config:
-  topology.workers: 1
-  topology.max.spout.pending: 1000
-
-# spout definitions
-spouts:
-  - id: "mqtt-spout"
-    className: "org.apache.storm.mqtt.spout.MqttSpout"
-    constructorArgs:
-      - ref: "mqtt-type"
-      - ref: "mqtt-options"
-    parallelism: 1
-
-# bolt definitions
-bolts:
-  - id: "log"
-    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
-    parallelism: 1
-
-
-streams:
-  - from: "mqtt-spout"
-    to: "log"
-    grouping:
-      type: SHUFFLE
diff --git a/examples/storm-mqtt-examples/src/main/flux/ssl-sample.yaml 
b/examples/storm-mqtt-examples/src/main/flux/ssl-sample.yaml
deleted file mode 100644
index bfb668d32..000000000
--- a/examples/storm-mqtt-examples/src/main/flux/ssl-sample.yaml
+++ /dev/null
@@ -1,78 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
----
-
-# topology definition
-# name to be used when submitting
-name: "mqtt-topology"
-
-components:
-   ########## MQTT Spout Config ############
-  - id: "mqtt-type"
-    className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
-
-  - id: "keystore-loader"
-    className: "org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader"
-    constructorArgs:
-      - "keystore.jks"
-      - "truststore.jks"
-    properties:
-      - name: "keyPassword"
-        value: "password"
-      - name: "keyStorePassword"
-        value: "password"
-      - name: "trustStorePassword"
-        value: "password"
-
-  - id: "mqtt-options"
-    className: "org.apache.storm.mqtt.common.MqttOptions"
-    properties:
-      - name: "url"
-        value: "ssl://raspberrypi.local:8883"
-      - name: "topics"
-        value:
-          - "/users/tgoetz/#"
-
-# topology configuration
-config:
-  topology.workers: 1
-  topology.max.spout.pending: 1000
-
-# spout definitions
-spouts:
-  - id: "mqtt-spout"
-    className: "org.apache.storm.mqtt.spout.MqttSpout"
-    constructorArgs:
-      - ref: "mqtt-type"
-      - ref: "mqtt-options"
-      - ref: "keystore-loader"
-    parallelism: 1
-
-# bolt definitions
-bolts:
-
-  - id: "log"
-    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
-    parallelism: 1
-
-
-streams:
-
-  - from: "mqtt-spout"
-    to: "log"
-    grouping:
-      type: SHUFFLE
diff --git 
a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
 
b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
deleted file mode 100644
index b9a7c1fb0..000000000
--- 
a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mqtt.examples;
-
-import org.apache.storm.mqtt.MqttMessage;
-import org.apache.storm.mqtt.MqttMessageMapper;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Given a topic name: "users/{user}/{location}/{deviceId}"
- * and a payload of "{temperature}/{humidity}"
- * emits a tuple containing
- * {@code user(String), deviceId(String), location(String), temperature(float),
- * humidity(float)}.
- */
-public final class CustomMessageMapper implements MqttMessageMapper {
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOG = LoggerFactory.getLogger(
-            CustomMessageMapper.class);
-    private static final int TOPIC_INDEX_1 = 2;
-    private static final int TOPIC_INDEX_2 = 4;
-    private static final int TOPIC_INDEX_3 = 3;
-
-    /**
-     * Converts MQTT message to an instance of {@code Values}.
-     * @param message the message to convert
-     * @return the converted values
-     */
-    @Override
-    public Values toValues(final MqttMessage message) {
-        String topic = message.getTopic();
-        String[] topicElements = topic.split("/");
-        String[] payloadElements = new String(message.getMessage()).split("/");
-
-        return new Values(topicElements[TOPIC_INDEX_1],
-                topicElements[TOPIC_INDEX_2],
-                topicElements[TOPIC_INDEX_3],
-                Float.parseFloat(payloadElements[0]),
-                Float.parseFloat(payloadElements[1]));
-    }
-
-    /**
-     * Gets the output fields.
-     * @return the output fields
-     */
-    @Override
-    public Fields outputFields() {
-        return new Fields("user",
-                "deviceId",
-                "location",
-                "temperature",
-                "humidity");
-    }
-
-    /**
-     * Constructor.
-     */
-    public CustomMessageMapper() {
-    }
-}
diff --git 
a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java
 
b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java
deleted file mode 100644
index 4232eb007..000000000
--- 
a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mqtt.examples;
-
-import java.util.Random;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.storm.mqtt.MqttLogger;
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.QoS;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A MQTT example using a Storm topology.
- */
-public final class MqttBrokerPublisher {
-    private static final Logger LOG = LoggerFactory.getLogger(
-            MqttBrokerPublisher.class);
-    private static BrokerService broker;
-    private static BlockingConnection connection;
-    private static final int TEMPERATURE_MAX = 100;
-    private static final int HUMIDITY_MAX = 100;
-    /**
-     * The default wait in milliseconds.
-     */
-    private static final int WAIT_MILLIS_DEFAULT = 500;
-
-    /**
-     * Initializes {@code broker} and starts it.
-     * @throws Exception if an exception during adding a connector occurs
-     */
-    public static void startBroker() throws Exception {
-        LOG.info("Starting broker...");
-        broker = new BrokerService();
-        broker.addConnector("mqtt://localhost:1883");
-        broker.setDataDirectory("target");
-        broker.start();
-        LOG.info("MQTT broker started");
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                try {
-                    LOG.info("Shutting down MQTT broker...");
-                    broker.stop();
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        });
-    }
-
-    /**
-     * Initializes {@code connection}.
-     * @throws Exception if an exception during connecting to connector occurs
-     */
-    public static void startPublisher() throws Exception {
-        MQTT client = new MQTT();
-        client.setTracer(new MqttLogger());
-        client.setHost("tcp://localhost:1883");
-        client.setClientId("MqttBrokerPublisher");
-        connection = client.blockingConnection();
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                try {
-                    LOG.info("Shutting down MQTT client...");
-                    connection.disconnect();
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        });
-
-        connection.connect();
-    }
-
-    /**
-     * Publishes topics on connection.
-     * @throws Exception if an exception during publishing occurs
-     */
-    public static void publish() throws Exception {
-        String topic = "/users/tgoetz/office/1234";
-        Random rand = new Random();
-        LOG.info("Publishing to topic {}", topic);
-        LOG.info("Cntrl+C to exit.");
-
-        while (true) {
-            int temp = rand.nextInt(TEMPERATURE_MAX);
-            int hum = rand.nextInt(HUMIDITY_MAX);
-            String payload = temp + "/" + hum;
-
-            connection.publish(topic,
-                    payload.getBytes(),
-                    QoS.AT_LEAST_ONCE,
-                    false);
-            Thread.sleep(WAIT_MILLIS_DEFAULT);
-        }
-    }
-
-    /**
-     * The main method.
-     * @param args the command line arguments
-     * @throws Exception if an exception during connections or transmission
-     *     occurs
-     */
-    public static void main(final String[] args) throws Exception {
-        startBroker();
-        startPublisher();
-        publish();
-    }
-
-    /**
-     * Utility constructor to prevent initialization.
-     */
-    private MqttBrokerPublisher() {
-    }
-}
diff --git 
a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/package-info.java
 
b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/package-info.java
deleted file mode 100644
index 52c9270b0..000000000
--- 
a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright 2018 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * MQTT examples.
- */
-package org.apache.storm.mqtt.examples;
diff --git a/examples/storm-mqtt-examples/src/main/resources/log4j2.xml 
b/examples/storm-mqtt-examples/src/main/resources/log4j2.xml
deleted file mode 100644
index bfe57a197..000000000
--- a/examples/storm-mqtt-examples/src/main/resources/log4j2.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<Configuration status="WARN">
-    <Appenders>
-        <Console name="Console" target="SYSTEM_OUT">
-            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} 
- %msg%n"/>
-        </Console>
-    </Appenders>
-
-    <Loggers>
-        <Logger name="org.apache.storm.flux.wrappers" level="INFO"/>
-        <Logger name="org.apache.storm.mqtt" level="DEBUG"/>
-        <Root level="error">
-            <AppenderRef ref="Console"/>
-        </Root>
-    </Loggers>
-</Configuration>
\ No newline at end of file
diff --git a/external/storm-mqtt/README.md b/external/storm-mqtt/README.md
deleted file mode 100644
index fb5c71ef1..000000000
--- a/external/storm-mqtt/README.md
+++ /dev/null
@@ -1,375 +0,0 @@
-# Storm MQTT Integration
-
-## About
-
-MQTT is a lightweight publish/subscribe protocol frequently used in IoT 
applications.
-
-Further information can be found at http://mqtt.org. The HiveMQ website has a 
great series on 
-[MQTT Essentials](http://www.hivemq.com/mqtt-essentials/).
-
-Features include:
-
-* Full MQTT support (e.g. last will, QoS 0-2, retain, etc.)
-* Spout implementation(s) for subscribing to MQTT topics
-* A bolt implementation for publishing MQTT messages
-* A trident function implementation for publishing MQTT messages
-* Authentication and TLS/SSL support
-* User-defined "mappers" for converting MQTT messages to tuples (subscribers)
-* User-defined "mappers" for converting tuples to MQTT messages (publishers)
-
-
-## Quick Start
-To quickly see MQTT integration in action, follow the instructions below.
-
-**Start a MQTT broker and publisher**
-
-The command below will create an MQTT broker on port 1883, and start a 
publsher that will publish random 
-temperature/humidity values to an MQTT topic.
-
-Open a terminal and execute the following command (change the path as 
necessary):
-
-```bash
-java -cp examples/target/storm-mqtt-examples-*-SNAPSHOT.jar 
org.apache.storm.mqtt.examples.MqttBrokerPublisher
-```
-
-**Run the example toplogy**
-
-Run the sample topology using Flux. This will start a local mode cluster and 
topology that consists of the MQTT Spout
-publishing to a bolt that simply logs the information it receives.
-
-In a separate terminal, run the following command (Note that the `storm` 
executable must be on your PATH):
-
-```bash
-storm jar ./examples/target/storm-mqtt-examples-*-SNAPSHOT.jar 
org.apache.storm.flux.Flux ./examples/src/main/flux/sample.yaml --local
-```
-
-You should see data from MQTT being logged by the bolt:
-
-```
-27020 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - 
{user=tgoetz, deviceId=1234, location=office, temperature=67.0, humidity=65.0}
-27030 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - 
{user=tgoetz, deviceId=1234, location=office, temperature=47.0, humidity=85.0}
-27040 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - 
{user=tgoetz, deviceId=1234, location=office, temperature=69.0, humidity=94.0}
-27049 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - 
{user=tgoetz, deviceId=1234, location=office, temperature=4.0, humidity=98.0}
-27059 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - 
{user=tgoetz, deviceId=1234, location=office, temperature=51.0, humidity=12.0}
-27069 [Thread-17-log-executor[3 3]] INFO  o.a.s.f.w.b.LogInfoBolt - 
{user=tgoetz, deviceId=1234, location=office, temperature=27.0, humidity=65.0}
-```
-
-Either allow the local cluster to exit, or stop it by typing Cntrl-C.
-
-**MQTT Fault Tolerance In Action**
-
-After the toplogy has been shutdown, the MQTT subscription created by the MQTT 
spout will persist with the broker,
-and it will continue to receive and queue messages (as long as the broker is 
running).
-
-If you run the toplogy again (while the broker is still running), when the 
spout initially connects to the MQTT broker,
-it will receive all the messages it missed while it was down. You should see 
this as burst of messages, followed by a 
-rate of about two messages per second.
-
-This happens because, by default, the MQTT Spout creates a *session* when it 
subscribes -- that means it requests that
-the broker hold onto and redeliver any messages it missed while offline. 
Another important factor is the the 
-`MqttBrokerPublisher` publishes messages with a MQTT QoS of `1`, meaning *at 
least once delivery*.
-
-For more information about MQTT fault tolerance, see the **Delivery 
Guarantees** section below.
-
-
-
-## Delivery Guarantees
-In Storm terms, ***the MQTT Spout provides at least once delivery***, 
depending on the configuration of the publisher as
-well as the MQTT spout.
-
-The MQTT protocol defines the following QoS levels:
-
-* `0` - At Most Once (AKA "Fire and Forget")
-* `1` - At Least Once
-* `2` - Exactly Once
-
-This can be a little confusing as the MQTT protocol specification does not 
really address the concept of a node being 
-completely incinerated by a catasrophic event. This is in stark contrast with 
Storm's reliability model, which expects 
-and embraces the concept of node failure.
-
-So resiliancy is ultimately dependent on the underlying MQTT implementation 
and infrastructure.
-
-###Recommendations
-
-*You will never get at exactly once processing with this spout. It can be used 
with Trident, but it won't provide 
-transational semantics. You will only get at least once guarantees.*
-
-If you need reliability guarantees (i.e. *at least once processing*):
-
-1. For MQTT publishers (outside of Storm), publish messages with a QoS of `1` 
so the broker saves messages if/when the 
-spout is offline.
-2. Use the spout defaults (`cleanSession = false` and `qos = 1`)
-3. If you can, make sure any result of receiving and MQTT message is 
idempotent.
-4. Make sure your MQTT brokers don't die or get isolated due to a network 
partition. Be prepared for natural and 
-man-made disasters and network partitions. Incineration and destruction 
happens.
-
-
-
-
-
-## Configuration
-For the full range of configuration options, see the JavaDoc for 
`org.apache.storm.mqtt.common.MqttOptions`.
-
-### Message Mappers
-To define how MQTT messages are mapped to Storm tuples, you configure the MQTT 
spout with an implementation of the 
-`org.apache.storm.mqtt.MqttMessageMapper` interface, which looks like this:
-
-```java
-public interface MqttMessageMapper extends Serializable {
-
-    Values toValues(MqttMessage message);
-
-    Fields outputFields();
-}
-```
-
-The `MqttMessage` class contains the topic to which the message was published 
(`String`) and the message payload 
-(`byte[]`). For example, here is a `MqttMessageMapper` implementation that 
produces tuples based on the content of both
-the message topic and payload:
-
-```java
-/**
- * Given a topic name: "users/{user}/{location}/{deviceId}"
- * and a payload of "{temperature}/{humidity}"
- * emits a tuple containing user(String), deviceId(String), location(String), 
temperature(float), humidity(float)
- *
- */
-public class CustomMessageMapper implements MqttMessageMapper {
-    private static final Logger LOG = 
LoggerFactory.getLogger(CustomMessageMapper.class);
-
-
-    public Values toValues(MqttMessage message) {
-        String topic = message.getTopic();
-        String[] topicElements = topic.split("/");
-        String[] payloadElements = new String(message.getMessage()).split("/");
-
-        return new Values(topicElements[2], topicElements[4], 
topicElements[3], Float.parseFloat(payloadElements[0]), 
-                Float.parseFloat(payloadElements[1]));
-    }
-
-    public Fields outputFields() {
-        return new Fields("user", "deviceId", "location", "temperature", 
"humidity");
-    }
-}
-```
-
-### Tuple Mappers
-When publishing MQTT messages with the MQTT bolt or Trident function, you need 
to map Storm tuple data to MQTT messages 
-(topic/payload). This is done by implementing the 
`org.apache.storm.mqtt.MqttTupleMapper` interface:
-
-```java
-public interface MqttTupleMapper extends Serializable{
-
-    MqttMessage toMessage(ITuple tuple);
-
-}
-```
-
-For example, a simple `MqttTupleMapper` implementation might look like this:
-
-```java
-public class MyTupleMapper implements MqttTupleMapper {
-    public MqttMessage toMessage(ITuple tuple) {
-        String topic = "users/" + tuple.getStringByField("userId") + "/" + 
tuple.getStringByField("device");
-        byte[] payload = tuple.getStringByField("message").getBytes();
-        return new MqttMessage(topic, payload);
-    }
-}
-```
-
-### MQTT Spout Parallelism
-It's recommended that you use a parallelism of 1 for the MQTT spout, otherwise 
you will end up with multiple instances
-of the spout subscribed to the same topic(s), resulting in duplicate messages.
-
-If you want to parallelize the spout, it's recommended that you use multiple 
instances of the spout in your topolgoy 
-and use MQTT topic selectors to parition the data. How you implement the 
partitioning strategy is ultimately determined 
-by your MQTT topic structure. As an example, if you had topics partitioned by 
region (e.g. east/west) you could do 
-something like the following:
-
-```java
-String spout1Topic = "users/east/#";
-String spout2Topic = "users/west/#";
-```
-
-and then join the resulting streams together by subscribing a bolt to each 
stream.
-
-
-### Using Flux
-
-The following Flux YAML configuration creates the toplolgy used in the example:
-
-```yaml
-name: "mqtt-topology"
-
-components:
-   ########## MQTT Spout Config ############
-  - id: "mqtt-type"
-    className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
-
-  - id: "mqtt-options"
-    className: "org.apache.storm.mqtt.common.MqttOptions"
-    properties:
-      - name: "url"
-        value: "tcp://localhost:1883"
-      - name: "topics"
-        value:
-          - "/users/tgoetz/#"
-
-# topology configuration
-config:
-  topology.workers: 1
-  topology.max.spout.pending: 1000
-
-# spout definitions
-spouts:
-  - id: "mqtt-spout"
-    className: "org.apache.storm.mqtt.spout.MqttSpout"
-    constructorArgs:
-      - ref: "mqtt-type"
-      - ref: "mqtt-options"
-    parallelism: 1
-
-# bolt definitions
-bolts:
-  - id: "log"
-    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
-    parallelism: 1
-
-
-streams:
-  - from: "mqtt-spout"
-    to: "log"
-    grouping:
-      type: SHUFFLE
-
-```
-
-
-### Using Java
-
-Similarly, you can create the same topology using the Storm Core Java API:
-
-```java
-TopologyBuilder builder = new TopologyBuilder();
-MqttOptions options = new MqttOptions();
-options.setTopics(Arrays.asList("/users/tgoetz/#"));
-options.setCleanConnection(false);
-MqttSpout spout = new MqttSpout(new StringMessageMapper(), options);
-
-MqttBolt bolt = new LogInfoBolt();
-
-builder.setSpout("mqtt-spout", spout);
-builder.setBolt("log-bolt", bolt).shuffleGrouping("mqtt-spout");
-
-return builder.createTopology();
-```
-
-## SSL/TLS
-If the MQTT broker you are connecting to requires SSL or SSL client 
authentication, you need to configure the spout 
-with an appropriate URI, and the location of keystore/truststore files 
containing the necessary certificates.
-
-### SSL/TLS URIs
-To connect over SSL/TLS use a URI with a prefix of `ssl://` or `tls://` 
instead of `tcp://`. For further control over
-the algorithm, you can specify a specific protocol:
-
- * `ssl://` Use the JVM default version of the SSL protocol.
- * `sslv*://` Use a specific version of the SSL protocol, where `*` is 
replaced by the version (e.g. `sslv3://`)
- * `tls://` Use the JVM default version of the TLS protocol.
- * `tlsv*://` Use a specific version of the TLS protocol, where `*` is 
replaced by the version (e.g. `tlsv1.1://`)
- 
- 
-### Specifying Keystore/Truststore Locations
- 
- The `MqttSpout`, `MqttBolt` and `MqttPublishFunction` all have constructors 
that take a `KeyStoreLoader` instance that
- is used to load the certificates required for TLS/SSL connections. For 
example:
- 
-```java
- public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader 
keyStoreLoader)
-```
- 
-The `DefaultKeyStoreLoader` class can be used to load certificates from the 
local filesystem. Note that the 
-keystore/truststore need to be available on all worker nodes where the 
spout/bolt might be executed. To use 
-`DefaultKeyStoreLoader` you specify the location of the keystore/truststore 
file(s), and set the necessary passwords:
-
-```java
-DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks", 
"/path/to/truststore.jks");
-ksl.setKeyStorePassword("password");
-ksl.setTrustStorePassword("password");
-//...
-```
-
-If your keystore/truststore certificates are stored in a single file, you can 
use the one-argument constructor:
-
-```java
-DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks");
-ksl.setKeyStorePassword("password");
-//...
-```
-
-SSL/TLS can also be configured using Flux:
-
-```yaml
-name: "mqtt-topology"
-
-components:
-   ########## MQTT Spout Config ############
-  - id: "mqtt-type"
-    className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
-
-  - id: "keystore-loader"
-    className: "org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader"
-    constructorArgs:
-      - "keystore.jks"
-      - "truststore.jks"
-    properties:
-      - name: "keyPassword"
-        value: "password"
-      - name: "keyStorePassword"
-        value: "password"
-      - name: "trustStorePassword"
-        value: "password"
-
-  - id: "mqtt-options"
-    className: "org.apache.storm.mqtt.common.MqttOptions"
-    properties:
-      - name: "url"
-        value: "ssl://raspberrypi.local:8883"
-      - name: "topics"
-        value:
-          - "/users/tgoetz/#"
-
-# topology configuration
-config:
-  topology.workers: 1
-  topology.max.spout.pending: 1000
-
-# spout definitions
-spouts:
-  - id: "mqtt-spout"
-    className: "org.apache.storm.mqtt.spout.MqttSpout"
-    constructorArgs:
-      - ref: "mqtt-type"
-      - ref: "mqtt-options"
-      - ref: "keystore-loader"
-    parallelism: 1
-
-# bolt definitions
-bolts:
-
-  - id: "log"
-    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
-    parallelism: 1
-
-
-streams:
-
-  - from: "mqtt-spout"
-    to: "log"
-    grouping:
-      type: SHUFFLE
-
-```
-
-## Committer Sponsors
-
- * P. Taylor Goetz ([[email protected]](mailto:[email protected]))
\ No newline at end of file
diff --git a/external/storm-mqtt/pom.xml b/external/storm-mqtt/pom.xml
deleted file mode 100644
index 11377fd52..000000000
--- a/external/storm-mqtt/pom.xml
+++ /dev/null
@@ -1,148 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>storm-mqtt</artifactId>
-    <packaging>jar</packaging>
-
-    <name>storm-mqtt</name>
-
-    <parent>
-        <groupId>org.apache.storm</groupId>
-        <artifactId>storm</artifactId>
-        <version>2.6.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
-    
-    <repositories>
-        <repository>
-            <id>bintray</id>
-            <url>https://dl.bintray.com/andsel/maven/</url>
-            <releases>
-                <enabled>true</enabled>
-            </releases>
-            <snapshots>
-                <enabled>false</enabled>
-            </snapshots>
-        </repository>
-    </repositories>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-broker</artifactId>
-            <version>${activemq.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <!-- required to bypass failure in mvn deploy -->
-                    <groupId>org.apache.activemq</groupId>
-                    <artifactId>activemq-protobuf</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-mqtt</artifactId>
-            <version>${activemq.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-kahadb-store</artifactId>
-            <version>${activemq.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-client</artifactId>
-            <version>${project.version}</version>
-            <scope>${provided.scope}</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.fusesource.mqtt-client</groupId>
-            <artifactId>mqtt-client</artifactId>
-            <version>1.16</version>
-        </dependency>
-        <dependency>
-            <groupId>commons-lang</groupId>
-            <artifactId>commons-lang</artifactId>
-            <version>2.5</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-client</artifactId>
-            <version>${project.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-    <build>
-        <plugins>
-            <plugin>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <configuration>
-                    <enableAssertions>false</enableAssertions>
-                    <redirectTestOutputToFile>true</redirectTestOutputToFile>
-                    <excludedGroups>${java.unit.test.exclude}</excludedGroups>
-                    <includes>
-                        <include>${java.unit.test.include}</include>
-                    </includes>
-                    <argLine>-Djava.net.preferIPv4Stack=true 
-Xmx1536m</argLine>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-failsafe-plugin</artifactId>
-                <configuration>
-                    <enableAssertions>false</enableAssertions>
-                    <redirectTestOutputToFile>true</redirectTestOutputToFile>
-                    <includes>
-                        <include>${java.integration.test.include}</include>
-                    </includes>
-                    <groups>${java.integration.test.group}</groups>  <!--set 
in integration-test the profile-->
-                    <argLine>-Djava.net.preferIPv4Stack=true 
-Xmx1536m</argLine>
-                </configuration>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>integration-test</goal>
-                            <goal>verify</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-checkstyle-plugin</artifactId>
-                <!--Note - the version would be inherited-->
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-pmd-plugin</artifactId>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java
deleted file mode 100644
index 3644d16c7..000000000
--- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt;
-
-import org.fusesource.mqtt.client.Tracer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Wrapper around SLF4J logger that allows MQTT messages to be logged.
- */
-public class MqttLogger extends Tracer {
-    private static final Logger LOG = 
LoggerFactory.getLogger(MqttLogger.class);
-
-    @Override
-    public void debug(String message, Object... args) {
-        LOG.debug(String.format(message, args));
-    }
-
-}
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java
deleted file mode 100644
index 90401efba..000000000
--- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt;
-
-/**
- * Represents an MQTT Message consisting of a topic string (e.g. 
"/users/ptgoetz/office/thermostat")
- * and a byte array message/payload.
- *
- */
-public class MqttMessage {
-    private String topic;
-    private byte[] message;
-
-
-    public MqttMessage(String topic, byte[] payload) {
-        this.topic = topic;
-        this.message = payload;
-    }
-
-    public byte[] getMessage() {
-        return this.message;
-    }
-
-    public String getTopic() {
-        return this.topic;
-    }
-}
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
deleted file mode 100644
index caeb6057e..000000000
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt;
-
-import java.io.Serializable;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-/**
- * Represents an object that can be converted to a Storm Tuple from an 
AckableMessage,
- * given a MQTT Topic Name and a byte array payload.
- */
-public interface MqttMessageMapper extends Serializable {
-    /**
-     * Convert a `MqttMessage` to a set of Values that can be emitted as a 
Storm Tuple.
-     *
-     * @param message An MQTT Message.
-     * @return Values representing a Storm Tuple.
-     */
-    Values toValues(MqttMessage message);
-
-    /**
-     * Returns the list of output fields this Mapper produces.
-     *
-     * @return the list of output fields this mapper produces.
-     */
-    Fields outputFields();
-}
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java
deleted file mode 100644
index 58473b7a4..000000000
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt;
-
-import java.io.Serializable;
-
-import org.apache.storm.tuple.ITuple;
-
-/**
- * Given a Tuple, converts it to an MQTT message.
- */
-public interface MqttTupleMapper extends Serializable {
-
-    /**
-     * Converts a Tuple to a MqttMessage.
-     * @param tuple the incoming tuple
-     * @return the message to publish
-     */
-    MqttMessage toMessage(ITuple tuple);
-
-}
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
deleted file mode 100644
index 745607b9e..000000000
--- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt.bolt;
-
-import java.util.Map;
-import org.apache.storm.Config;
-import org.apache.storm.mqtt.MqttMessage;
-import org.apache.storm.mqtt.MqttTupleMapper;
-import org.apache.storm.mqtt.common.MqttOptions;
-import org.apache.storm.mqtt.common.MqttPublisher;
-import org.apache.storm.mqtt.common.SslUtils;
-import org.apache.storm.mqtt.ssl.KeyStoreLoader;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class MqttBolt extends BaseTickTupleAwareRichBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(MqttBolt.class);
-    private MqttTupleMapper mapper;
-    private transient MqttPublisher publisher;
-    private boolean retain = false;
-    private transient OutputCollector collector;
-    private MqttOptions options;
-    private KeyStoreLoader keyStoreLoader;
-    private transient String topologyName;
-
-
-    public MqttBolt(MqttOptions options, MqttTupleMapper mapper) {
-        this(options, mapper, null, false);
-    }
-
-    public MqttBolt(MqttOptions options, MqttTupleMapper mapper, boolean 
retain) {
-        this(options, mapper, null, retain);
-    }
-
-    public MqttBolt(MqttOptions options, MqttTupleMapper mapper, 
KeyStoreLoader keyStoreLoader) {
-        this(options, mapper, keyStoreLoader, false);
-    }
-
-    public MqttBolt(MqttOptions options, MqttTupleMapper mapper, 
KeyStoreLoader keyStoreLoader, boolean retain) {
-        this.options = options;
-        this.mapper = mapper;
-        this.retain = retain;
-        this.keyStoreLoader = keyStoreLoader;
-        // the following code is duplicated in the constructor of MqttPublisher
-        // we reproduce it here so we fail on the client side if SSL is 
misconfigured, rather than when the topology
-        // is deployed to the cluster
-        SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader);
-    }
-
-    @Override
-    public void prepare(Map<String, Object> conf, TopologyContext context, 
OutputCollector collector) {
-        this.collector = collector;
-        this.topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
-        this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, 
this.retain);
-        try {
-            this.publisher.connectMqtt(this.topologyName + "-" + 
context.getThisComponentId() + "-" + context.getThisTaskId());
-        } catch (Exception e) {
-            LOG.error("Unable to connect to MQTT Broker.", e);
-            throw new RuntimeException("Unable to connect to MQTT Broker.", e);
-        }
-    }
-
-    @Override
-    protected void process(Tuple input) {
-        MqttMessage message = this.mapper.toMessage(input);
-        try {
-            this.publisher.publish(message);
-            this.collector.ack(input);
-        } catch (Exception e) {
-            LOG.warn("Error publishing MQTT message. Failing tuple.", e);
-            // should we fail the tuple or kill the worker?
-            collector.reportError(e);
-            collector.fail(input);
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        // this bolt does not emit tuples
-    }
-}
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
deleted file mode 100644
index 90c922247..000000000
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt.common;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * MQTT Configuration Options.
- */
-public class MqttOptions implements Serializable {
-    private String url = "tcp://localhost:1883";
-    private List<String> topics = null;
-    private boolean cleanConnection = false;
-
-    private String willTopic;
-    private String willPayload;
-    private int willQos = 1;
-    private boolean willRetain = false;
-
-    private long reconnectDelay = 10;
-    private long reconnectDelayMax = 30 * 1000;
-    private double reconnectBackOffMultiplier = 2.0f;
-    private long reconnectAttemptsMax = -1;
-    private long connectAttemptsMax = -1;
-
-    private String userName = "";
-    private String password = "";
-
-    private int qos = 1;
-
-    public String getUrl() {
-        return url;
-    }
-
-    /**
-     * Sets the url for connecting to the MQTT broker, e.g. {@code 
tcp://localhost:1883}.
-     */
-    public void setUrl(String url) {
-        this.url = url;
-    }
-
-    public List<String> getTopics() {
-        return topics;
-    }
-
-    /**
-     * A list of MQTT topics to subscribe to.
-     */
-    public void setTopics(List<String> topics) {
-        this.topics = topics;
-    }
-
-    public boolean isCleanConnection() {
-        return cleanConnection;
-    }
-
-    /**
-     * Set to false if you want the MQTT server to persist topic subscriptions 
and ack positions across client sessions.
-     * Defaults to false.
-     */
-    public void setCleanConnection(boolean cleanConnection) {
-        this.cleanConnection = cleanConnection;
-    }
-
-    public String getWillTopic() {
-        return willTopic;
-    }
-
-    /**
-     * If set the server will publish the client's Will message to the 
specified topics if the client has an unexpected
-     * disconnection.
-     */
-    public void setWillTopic(String willTopic) {
-        this.willTopic = willTopic;
-    }
-
-    public String getWillPayload() {
-        return willPayload;
-    }
-
-    /**
-     * The Will message to send. Defaults to a zero length message.
-     */
-    public void setWillPayload(String willPayload) {
-        this.willPayload = willPayload;
-    }
-
-    public long getReconnectDelay() {
-        return reconnectDelay;
-    }
-
-    /**
-     * How long to wait in ms before the first reconnect attempt. Defaults to 
10.
-     */
-    public void setReconnectDelay(long reconnectDelay) {
-        this.reconnectDelay = reconnectDelay;
-    }
-
-    public long getReconnectDelayMax() {
-        return reconnectDelayMax;
-    }
-
-    /**
-     * The maximum amount of time in ms to wait between reconnect attempts. 
Defaults to 30,000.
-     */
-    public void setReconnectDelayMax(long reconnectDelayMax) {
-        this.reconnectDelayMax = reconnectDelayMax;
-    }
-
-    public double getReconnectBackOffMultiplier() {
-        return reconnectBackOffMultiplier;
-    }
-
-    /**
-     * The Exponential backoff be used between reconnect attempts. Set to 1 to 
disable exponential backoff. Defaults to
-     * 2.
-     */
-    public void setReconnectBackOffMultiplier(double 
reconnectBackOffMultiplier) {
-        this.reconnectBackOffMultiplier = reconnectBackOffMultiplier;
-    }
-
-    public long getReconnectAttemptsMax() {
-        return reconnectAttemptsMax;
-    }
-
-    /**
-     * The maximum number of reconnect attempts before an error is reported 
back to the client after a server
-     * connection had previously been established. Set to -1 to use unlimited 
attempts. Defaults to -1.
-     */
-    public void setReconnectAttemptsMax(long reconnectAttemptsMax) {
-        this.reconnectAttemptsMax = reconnectAttemptsMax;
-    }
-
-    public long getConnectAttemptsMax() {
-        return connectAttemptsMax;
-    }
-
-    /**
-     * The maximum number of reconnect attempts before an error is reported 
back to the client on the first attempt by
-     * the client to connect to a server. Set to -1 to use unlimited attempts. 
Defaults to -1.
-     */
-    public void setConnectAttemptsMax(long connectAttemptsMax) {
-        this.connectAttemptsMax = connectAttemptsMax;
-    }
-
-    public String getUserName() {
-        return userName;
-    }
-
-    /**
-     * The username for authenticated sessions.
-     */
-    public void setUserName(String userName) {
-        this.userName = userName;
-    }
-
-    public String getPassword() {
-        return password;
-    }
-
-    /**
-     * The password for authenticated sessions.
-     */
-    public void setPassword(String password) {
-        this.password = password;
-    }
-
-    public int getQos() {
-        return this.qos;
-    }
-
-    /**
-     * Sets the quality of service to use for MQTT messages. Defaults to 1 (at 
least once).
-     */
-    public void setQos(int qos) {
-        if (qos < 0 || qos > 2) {
-            throw new IllegalArgumentException("MQTT QoS must be >= 0 and <= 
2");
-        }
-        this.qos = qos;
-    }
-
-    public int getWillQos() {
-        return this.willQos;
-    }
-
-    /**
-     * Sets the quality of service to use for the MQTT Will message. Defaults 
to 1 (at least once).
-     */
-    public void setWillQos(int qos) {
-        if (qos < 0 || qos > 2) {
-            throw new IllegalArgumentException("MQTT Will QoS must be >= 0 and 
<= 2");
-        }
-        this.willQos = qos;
-    }
-
-    public boolean getWillRetain() {
-        return this.willRetain;
-    }
-
-    /**
-     * Set to true if you want the Will message to be published with the 
retain option.
-     */
-    public void setWillRetain(boolean retain) {
-        this.willRetain = retain;
-    }
-
-    public static class Builder {
-        private MqttOptions options = new MqttOptions();
-
-        public Builder url(String url) {
-            this.options.url = url;
-            return this;
-        }
-
-
-        public Builder topics(List<String> topics) {
-            this.options.topics = topics;
-            return this;
-        }
-
-        public Builder cleanConnection(boolean cleanConnection) {
-            this.options.cleanConnection = cleanConnection;
-            return this;
-        }
-
-        public Builder willTopic(String willTopic) {
-            this.options.willTopic = willTopic;
-            return this;
-        }
-
-        public Builder willPayload(String willPayload) {
-            this.options.willPayload = willPayload;
-            return this;
-        }
-
-        public Builder willRetain(boolean retain) {
-            this.options.willRetain = retain;
-            return this;
-        }
-
-        public Builder willQos(int qos) {
-            this.options.setWillQos(qos);
-            return this;
-        }
-
-        public Builder reconnectDelay(long reconnectDelay) {
-            this.options.reconnectDelay = reconnectDelay;
-            return this;
-        }
-
-        public Builder reconnectDelayMax(long reconnectDelayMax) {
-            this.options.reconnectDelayMax = reconnectDelayMax;
-            return this;
-        }
-
-        public Builder reconnectBackOffMultiplier(double 
reconnectBackOffMultiplier) {
-            this.options.reconnectBackOffMultiplier = 
reconnectBackOffMultiplier;
-            return this;
-        }
-
-        public Builder reconnectAttemptsMax(long reconnectAttemptsMax) {
-            this.options.reconnectAttemptsMax = reconnectAttemptsMax;
-            return this;
-        }
-
-        public Builder connectAttemptsMax(long connectAttemptsMax) {
-            this.options.connectAttemptsMax = connectAttemptsMax;
-            return this;
-        }
-
-        public Builder userName(String userName) {
-            this.options.userName = userName;
-            return this;
-        }
-
-        public Builder password(String password) {
-            this.options.password = password;
-            return this;
-        }
-
-        public Builder qos(int qos) {
-            this.options.setQos(qos);
-            return this;
-        }
-
-        public MqttOptions build() {
-            return this.options;
-        }
-    }
-}
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
deleted file mode 100644
index 1f70b9315..000000000
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt.common;
-
-import org.apache.storm.mqtt.MqttMessage;
-import org.apache.storm.mqtt.ssl.KeyStoreLoader;
-
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.QoS;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MqttPublisher {
-    private static final Logger LOG = 
LoggerFactory.getLogger(MqttPublisher.class);
-
-    private MqttOptions options;
-    private transient BlockingConnection connection;
-    private KeyStoreLoader keyStoreLoader;
-    private QoS qos;
-    private boolean retain = false;
-
-
-    public MqttPublisher(MqttOptions options) {
-        this(options, null, false);
-    }
-
-    public MqttPublisher(MqttOptions options, boolean retain) {
-        this(options, null, retain);
-    }
-
-    public MqttPublisher(MqttOptions options, KeyStoreLoader keyStoreLoader, 
boolean retain) {
-        this.retain = retain;
-        this.options = options;
-        this.keyStoreLoader = keyStoreLoader;
-        SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader);
-        this.qos = MqttUtils.qosFromInt(this.options.getQos());
-    }
-
-    public void publish(MqttMessage message) throws Exception {
-        this.connection.publish(message.getTopic(), message.getMessage(), 
this.qos, this.retain);
-    }
-
-    public void connectMqtt(String clientId) throws Exception {
-        MQTT client = MqttUtils.configureClient(this.options, clientId, 
this.keyStoreLoader);
-        this.connection = client.blockingConnection();
-        this.connection.connect();
-    }
-}
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java
deleted file mode 100644
index 029b4c6a5..000000000
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt.common;
-
-import java.net.URI;
-
-import org.apache.storm.mqtt.MqttLogger;
-import org.apache.storm.mqtt.ssl.KeyStoreLoader;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.QoS;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MqttUtils {
-    private static final Logger LOG = LoggerFactory.getLogger(MqttUtils.class);
-
-    private MqttUtils() {}
-
-    public static QoS qosFromInt(int i) {
-        QoS qos = null;
-        switch (i) {
-            case 0:
-                qos = QoS.AT_MOST_ONCE;
-                break;
-            case 1:
-                qos = QoS.AT_LEAST_ONCE;
-                break;
-            case 2:
-                qos = QoS.EXACTLY_ONCE;
-                break;
-            default:
-                throw new IllegalArgumentException(i + "is not a valid MQTT 
QoS.");
-        }
-        return qos;
-    }
-
-
-    public static MQTT configureClient(MqttOptions options, String clientId, 
KeyStoreLoader keyStoreLoader)
-        throws Exception {
-
-        MQTT client = new MQTT();
-        URI uri = URI.create(options.getUrl());
-
-        client.setHost(uri);
-        if (!uri.getScheme().toLowerCase().equals("tcp")) {
-            client.setSslContext(SslUtils.sslContext(uri.getScheme(), 
keyStoreLoader));
-        }
-        client.setClientId(clientId);
-        LOG.info("MQTT ClientID: {}", client.getClientId().toString());
-        client.setCleanSession(options.isCleanConnection());
-
-        client.setReconnectDelay(options.getReconnectDelay());
-        client.setReconnectDelayMax(options.getReconnectDelayMax());
-        
client.setReconnectBackOffMultiplier(options.getReconnectBackOffMultiplier());
-        client.setConnectAttemptsMax(options.getConnectAttemptsMax());
-        client.setReconnectAttemptsMax(options.getReconnectAttemptsMax());
-
-
-        client.setUserName(options.getUserName());
-        client.setPassword(options.getPassword());
-        client.setTracer(new MqttLogger());
-
-        if (options.getWillTopic() != null && options.getWillPayload() != 
null) {
-            QoS qos = MqttUtils.qosFromInt(options.getWillQos());
-            client.setWillQos(qos);
-            client.setWillTopic(options.getWillTopic());
-            client.setWillMessage(options.getWillPayload());
-            client.setWillRetain(options.getWillRetain());
-        }
-        return client;
-    }
-}
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java
deleted file mode 100644
index 83f311fbe..000000000
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt.common;
-
-import java.net.URI;
-import java.security.KeyStore;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-import org.apache.storm.mqtt.ssl.KeyStoreLoader;
-
-public class SslUtils {
-    private SslUtils() {}
-
-    public static void checkSslConfig(String url, KeyStoreLoader loader) {
-        URI uri = URI.create(url);
-        String scheme = uri.getScheme().toLowerCase();
-        if (!(scheme.equals("tcp") || scheme.startsWith("tls") || 
scheme.startsWith("ssl"))) {
-            throw new IllegalArgumentException("Unrecognized URI scheme: " + 
scheme);
-        }
-        if (!scheme.equalsIgnoreCase("tcp") && loader == null) {
-            throw new IllegalStateException("A TLS/SSL MQTT URL was specified, 
but no KeyStoreLoader configured. "
-                    + "A KeyStoreLoader implementation is required when using 
TLS/SSL.");
-        }
-    }
-
-    public static SSLContext sslContext(String scheme, KeyStoreLoader 
keyStoreLoader) throws Exception {
-        KeyStore ks = KeyStore.getInstance("JKS");
-        ks.load(keyStoreLoader.keyStoreInputStream(), 
keyStoreLoader.keyStorePassword().toCharArray());
-
-        KeyStore ts = KeyStore.getInstance("JKS");
-        ts.load(keyStoreLoader.trustStoreInputStream(), 
keyStoreLoader.trustStorePassword().toCharArray());
-
-        KeyManagerFactory kmf = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
-        kmf.init(ks, keyStoreLoader.keyPassword().toCharArray());
-
-        TrustManagerFactory tmf = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
-        tmf.init(ts);
-
-        SSLContext sc = SSLContext.getInstance(scheme.toUpperCase());
-        TrustManager[] trustManagers = tmf.getTrustManagers();
-        sc.init(kmf.getKeyManagers(), trustManagers, null);
-
-        return sc;
-    }
-}
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
deleted file mode 100644
index a943f8653..000000000
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt.mappers;
-
-import org.apache.storm.mqtt.MqttMessage;
-import org.apache.storm.mqtt.MqttMessageMapper;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-
-public class ByteArrayMessageMapper implements MqttMessageMapper {
-    @Override
-    public Values toValues(MqttMessage message) {
-        return new Values(message.getTopic(), message.getMessage());
-    }
-
-    @Override
-    public Fields outputFields() {
-        return new Fields("topic", "message");
-    }
-}
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
deleted file mode 100644
index 3f76b6177..000000000
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt.mappers;
-
-import org.apache.storm.mqtt.MqttMessage;
-import org.apache.storm.mqtt.MqttMessageMapper;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-/**
- * Given a String topic and byte[] message, emits a tuple with fields
- * "topic" and "message", both of which are Strings.
- */
-public class StringMessageMapper implements MqttMessageMapper {
-    @Override
-    public Values toValues(MqttMessage message) {
-        return new Values(message.getTopic(), new 
String(message.getMessage()));
-    }
-
-    @Override
-    public Fields outputFields() {
-        return new Fields("topic", "message");
-    }
-}
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
deleted file mode 100644
index 0b9144215..000000000
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt.spout;
-
-import org.apache.commons.lang.builder.EqualsBuilder;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.storm.mqtt.MqttMessage;
-
-/**
- * Represents an MQTT Message consisting of a topic string (e.g. 
"/users/ptgoetz/office/thermostat")
- * and a byte array message/payload.
- *
- */
-class AckableMessage {
-    private String topic;
-    private byte[] message;
-    private Runnable ack;
-
-    AckableMessage(String topic, byte[] message, Runnable ack) {
-        this.topic = topic;
-        this.message = message;
-        this.ack = ack;
-    }
-
-    public MqttMessage getMessage() {
-        return new MqttMessage(this.topic, this.message);
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder(71, 123)
-            .append(this.topic)
-            .append(this.message)
-            .toHashCode();
-    }
-
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj == null) {
-            return false;
-        }
-        if (obj == this) {
-            return true;
-        }
-        if (obj.getClass() != getClass()) {
-            return false;
-        }
-        AckableMessage tm = (AckableMessage) obj;
-        return new EqualsBuilder()
-            .appendSuper(super.equals(obj))
-            .append(this.topic, tm.topic)
-            .append(this.message, tm.message)
-            .isEquals();
-    }
-
-    Runnable ack() {
-        return this.ack;
-    }
-}
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
deleted file mode 100644
index 265a6f97c..000000000
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt.spout;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.storm.Config;
-import org.apache.storm.mqtt.MqttMessageMapper;
-import org.apache.storm.mqtt.common.MqttOptions;
-import org.apache.storm.mqtt.common.MqttUtils;
-import org.apache.storm.mqtt.common.SslUtils;
-import org.apache.storm.mqtt.ssl.KeyStoreLoader;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.UTF8Buffer;
-import org.fusesource.mqtt.client.Callback;
-import org.fusesource.mqtt.client.CallbackConnection;
-import org.fusesource.mqtt.client.Listener;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.QoS;
-import org.fusesource.mqtt.client.Topic;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MqttSpout implements IRichSpout, Listener {
-    private static final Logger LOG = LoggerFactory.getLogger(MqttSpout.class);
-    protected transient SpoutOutputCollector collector;
-    protected transient TopologyContext context;
-    protected transient LinkedBlockingQueue<AckableMessage> incoming;
-    protected transient HashMap<Long, AckableMessage> pending;
-    protected MqttMessageMapper type;
-    protected MqttOptions options;
-    protected KeyStoreLoader keyStoreLoader;
-    private String topologyName;
-    private CallbackConnection connection;
-    private transient Map<String, Object> conf;
-    private boolean mqttConnected = false;
-    private boolean mqttConnectFailed = false;
-
-
-    private Long sequence = Long.MIN_VALUE;
-
-    protected MqttSpout() {}
-
-    public MqttSpout(MqttMessageMapper type, MqttOptions options) {
-        this(type, options, null);
-    }
-
-    public MqttSpout(MqttMessageMapper type, MqttOptions options, 
KeyStoreLoader keyStoreLoader) {
-        this.type = type;
-        this.options = options;
-        this.keyStoreLoader = keyStoreLoader;
-        SslUtils.checkSslConfig(this.options.getUrl(), this.keyStoreLoader);
-    }
-
-    private Long nextId() {
-        this.sequence++;
-        if (this.sequence == Long.MAX_VALUE) {
-            this.sequence = Long.MIN_VALUE;
-        }
-        return this.sequence;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(this.type.outputFields());
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-
-    @Override
-    public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
-        this.topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
-
-        this.collector = collector;
-        this.context = context;
-        this.conf = conf;
-
-        this.incoming = new LinkedBlockingQueue<>();
-        this.pending = new HashMap<>();
-
-        try {
-            connectMqtt();
-        } catch (Exception e) {
-            this.collector.reportError(e);
-            throw new RuntimeException("MQTT Connection failed.", e);
-        }
-
-    }
-
-    private void connectMqtt() throws Exception {
-        String clientId = this.topologyName + "-"
-                + this.context.getThisComponentId() + "-"
-                + this.context.getThisTaskId();
-
-        MQTT client = MqttUtils.configureClient(this.options, clientId, 
this.keyStoreLoader);
-        this.connection = client.callbackConnection();
-        this.connection.listener(this);
-        this.connection.connect(new ConnectCallback());
-
-        while (!this.mqttConnected && !this.mqttConnectFailed) {
-            LOG.info("Waiting for connection...");
-            Thread.sleep(500);
-        }
-
-        if (this.mqttConnected) {
-            List<String> topicList = this.options.getTopics();
-            Topic[] topics = new Topic[topicList.size()];
-            QoS qos = MqttUtils.qosFromInt(this.options.getQos());
-            for (int i = 0; i < topicList.size(); i++) {
-                topics[i] = new Topic(topicList.get(i), qos);
-            }
-            connection.subscribe(topics, new SubscribeCallback());
-        }
-    }
-
-
-    @Override
-    public void close() {
-        this.connection.disconnect(new DisconnectCallback());
-    }
-
-    @Override
-    public void activate() {
-    }
-
-    @Override
-    public void deactivate() {
-    }
-
-    /**
-     * When this method is called, Storm is requesting that the Spout emit 
tuples to the
-     * output collector. This method should be non-blocking, so if the Spout 
has no tuples
-     * to emit, this method should return. nextTuple, ack, and fail are all 
called in a tight
-     * loop in a single thread in the spout task. When there are no tuples to 
emit, it is courteous
-     * to have nextTuple sleep for a short amount of time (like a single 
millisecond)
-     * so as not to waste too much CPU.
-     */
-    @Override
-    public void nextTuple() {
-        AckableMessage tm = this.incoming.poll();
-        if (tm != null) {
-            Long id = nextId();
-            this.collector.emit(this.type.toValues(tm.getMessage()), id);
-            this.pending.put(id, tm);
-        } else {
-            Thread.yield();
-        }
-
-    }
-
-    /**
-     * Storm has determined that the tuple emitted by this spout with the 
msgId identifier
-     * has been fully processed. Typically, an implementation of this method 
will take that
-     * message off the queue and prevent it from being replayed.
-     *
-     * @param msgId the id of the message to acknowledge
-     */
-    @Override
-    public void ack(Object msgId) {
-        AckableMessage msg = this.pending.remove(msgId);
-        this.connection.getDispatchQueue().execute(msg.ack());
-    }
-
-    /**
-     * The tuple emitted by this spout with the msgId identifier has failed to 
be
-     * fully processed. Typically, an implementation of this method will put 
that
-     * message back on the queue to be replayed at a later time.
-     *
-     * @param msgId the id of the failed message
-     */
-    @Override
-    public void fail(Object msgId) {
-        try {
-            this.incoming.put(this.pending.remove(msgId));
-        } catch (InterruptedException e) {
-            LOG.warn("Interrupted while re-queueing message.", e);
-        }
-    }
-
-
-    // ################# Listener Implementation ######################
-    @Override
-    public void onConnected() {
-        // this gets called repeatedly for no apparent reason, don't do 
anything
-    }
-
-    @Override
-    public void onDisconnected() {
-        // this gets called repeatedly for no apparent reason, don't do 
anything
-    }
-
-    @Override
-    public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
-        LOG.debug("Received message: topic={}, payload={}", topic.toString(), 
new String(payload.toByteArray()));
-        try {
-            this.incoming.put(new AckableMessage(topic.toString(), 
payload.toByteArray(), ack));
-        } catch (InterruptedException e) {
-            LOG.warn("Interrupted while queueing an MQTT message.");
-        }
-    }
-
-    @Override
-    public void onFailure(Throwable throwable) {
-        LOG.error("MQTT Connection Failure.", throwable);
-        MqttSpout.this.connection.disconnect(new DisconnectCallback());
-        throw new RuntimeException("MQTT Connection failure.", throwable);
-    }
-
-    // ################# Connect Callback Implementation ######################
-    private class ConnectCallback implements Callback<Void> {
-        @Override
-        public void onSuccess(Void v) {
-            LOG.info("MQTT Connected. Subscribing to topic...");
-            MqttSpout.this.mqttConnected = true;
-        }
-
-        @Override
-        public void onFailure(Throwable throwable) {
-            LOG.info("MQTT Connection failed.");
-            MqttSpout.this.mqttConnectFailed = true;
-        }
-    }
-
-    // ################# Subscribe Callback Implementation 
######################
-    private class SubscribeCallback implements Callback<byte[]> {
-        @Override
-        public void onSuccess(byte[] qos) {
-            LOG.info("Subscripton sucessful.");
-        }
-
-        @Override
-        public void onFailure(Throwable throwable) {
-            LOG.error("MQTT Subscripton failed.", throwable);
-            throw new RuntimeException("MQTT Subscribe failed.", throwable);
-        }
-    }
-
-    // ################# Subscribe Callback Implementation 
######################
-    private class DisconnectCallback implements Callback<Void> {
-        @Override
-        public void onSuccess(Void theVoid) {
-            LOG.info("MQTT Disconnect successful.");
-        }
-
-        @Override
-        public void onFailure(Throwable throwable) {
-            // Disconnects don't fail.
-        }
-    }
-
-}
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
deleted file mode 100644
index 14c3c6cd0..000000000
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt.ssl;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-
-/**
- * KeyStoreLoader implementation that uses local files.
- */
-public class DefaultKeyStoreLoader implements KeyStoreLoader {
-    private String ksFile = null;
-    private String tsFile = null;
-    private String keyStorePassword = "";
-    private String trustStorePassword = "";
-    private String keyPassword = "";
-
-    /**
-     * Creates a DefaultKeystoreLoader that uses the same file
-     * for both the keystore and truststore.
-     *
-     * @param keystore path to keystore file
-     */
-    public DefaultKeyStoreLoader(String keystore) {
-        this.ksFile = keystore;
-    }
-
-    /**
-     * Creates a DefaultKeystoreLoader that uses separate files
-     * for the keystore and truststore.
-     *
-     * @param keystore path to keystore file
-     * @param truststore path to truststore file
-     */
-    public DefaultKeyStoreLoader(String keystore, String truststore) {
-        this.ksFile = keystore;
-        this.tsFile = truststore;
-    }
-
-    public void setKeyStorePassword(String keyStorePassword) {
-        this.keyStorePassword = keyStorePassword;
-    }
-
-    public void setTrustStorePassword(String trustStorePassword) {
-        this.trustStorePassword = trustStorePassword;
-    }
-
-    public void setKeyPassword(String keyPassword) {
-        this.keyPassword = keyPassword;
-    }
-
-    @Override
-    public InputStream keyStoreInputStream() throws FileNotFoundException {
-        return new FileInputStream(this.ksFile);
-    }
-
-    @Override
-    public InputStream trustStoreInputStream() throws FileNotFoundException {
-        // if no truststore file, assume the truststore is the keystore.
-        if (this.tsFile == null) {
-            return new FileInputStream(this.ksFile);
-        } else {
-            return new FileInputStream(this.tsFile);
-        }
-    }
-
-    @Override
-    public String keyStorePassword() {
-        return this.keyStorePassword;
-    }
-
-    @Override
-    public String trustStorePassword() {
-        return this.trustStorePassword;
-    }
-
-    @Override
-    public String keyPassword() {
-        return this.keyPassword;
-    }
-}
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
deleted file mode 100644
index 5630ff80e..000000000
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt.ssl;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-
-/**
- * Abstraction for loading keystore/truststore data. This allows keystores
- * to be loaded from different sources (File system, HDFS, etc.).
- */
-public interface KeyStoreLoader extends Serializable {
-
-    String keyStorePassword();
-
-    String trustStorePassword();
-
-    String keyPassword();
-
-    InputStream keyStoreInputStream() throws IOException;
-
-    InputStream trustStoreInputStream() throws IOException;
-}
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
deleted file mode 100644
index 329cff599..000000000
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt.trident;
-
-import java.util.Map;
-import org.apache.storm.Config;
-import org.apache.storm.mqtt.MqttMessage;
-import org.apache.storm.mqtt.MqttTupleMapper;
-import org.apache.storm.mqtt.common.MqttOptions;
-import org.apache.storm.mqtt.common.MqttPublisher;
-import org.apache.storm.mqtt.common.SslUtils;
-import org.apache.storm.mqtt.ssl.KeyStoreLoader;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MqttPublishFunction extends BaseFunction {
-    private static final Logger LOG = 
LoggerFactory.getLogger(MqttPublishFunction.class);
-    private MqttTupleMapper mapper;
-    private transient MqttPublisher publisher;
-    private boolean retain = false;
-    private transient OutputCollector collector;
-    private MqttOptions options;
-    private KeyStoreLoader keyStoreLoader;
-    private transient String topologyName;
-
-
-    public MqttPublishFunction(MqttOptions options, MqttTupleMapper mapper, 
KeyStoreLoader keyStoreLoader, boolean retain) {
-        this.options = options;
-        this.mapper = mapper;
-        this.retain = retain;
-        this.keyStoreLoader = keyStoreLoader;
-        // the following code is duplicated in the constructor of MqttPublisher
-        // we reproduce it here so we fail on the client side if SSL is 
misconfigured, rather than when the topology
-        // is deployed to the cluster
-        SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader);
-    }
-
-
-    @Override
-    public void prepare(Map<String, Object> conf, TridentOperationContext 
context) {
-        this.topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
-        this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, 
this.retain);
-        try {
-            this.publisher.connectMqtt(this.topologyName + "-" + 
context.getPartitionIndex());
-        } catch (Exception e) {
-            LOG.error("Unable to connect to MQTT Broker.", e);
-            throw new RuntimeException("Unable to connect to MQTT Broker.", e);
-        }
-    }
-
-    @Override
-    public void execute(TridentTuple tuple, TridentCollector collector) {
-        MqttMessage message = this.mapper.toMessage(tuple);
-        try {
-            this.publisher.publish(message);
-        } catch (Exception e) {
-            LOG.warn("Error publishing MQTT message. Failing tuple.", e);
-            // should we fail the batch or kill the worker?
-            throw new FailedException();
-        }
-    }
-}
diff --git 
a/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
 
b/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
deleted file mode 100644
index b823456af..000000000
--- 
a/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.mqtt;
-
-import java.io.Serializable;
-import java.net.URI;
-import java.util.Arrays;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.mqtt.bolt.MqttBolt;
-import org.apache.storm.mqtt.common.MqttOptions;
-import org.apache.storm.mqtt.common.MqttPublisher;
-import org.apache.storm.mqtt.mappers.StringMessageMapper;
-import org.apache.storm.mqtt.spout.MqttSpout;
-import org.apache.storm.testing.IntegrationTest;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.ITuple;
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.Message;
-import org.fusesource.mqtt.client.QoS;
-import org.fusesource.mqtt.client.Topic;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-@IntegrationTest
-public class StormMqttIntegrationTest implements Serializable {
-    private static final Logger LOG = 
LoggerFactory.getLogger(StormMqttIntegrationTest.class);
-    private static final String TEST_TOPIC = "/mqtt-topology";
-    private static final String RESULT_TOPIC = "/integration-result";
-    private static final String RESULT_PAYLOAD = "Storm MQTT Spout";
-    static boolean spoutActivated = false;
-    private static BrokerService broker;
-
-    @AfterAll
-    public static void cleanup() throws Exception {
-        broker.stop();
-    }
-
-    @BeforeAll
-    public static void start() throws Exception {
-        LOG.warn("Starting broker...");
-        broker = new BrokerService();
-        broker.addConnector("mqtt://localhost:1883");
-        broker.setDataDirectory("target");
-        broker.start();
-        LOG.debug("MQTT broker started");
-    }
-
-    @Test
-    public void testMqttTopology() throws Exception {
-        MQTT client = new MQTT();
-        client.setTracer(new MqttLogger());
-        URI uri = URI.create("tcp://localhost:1883");
-        client.setHost(uri);
-
-        client.setClientId("MQTTSubscriber");
-        client.setCleanSession(false);
-        BlockingConnection connection = client.blockingConnection();
-        connection.connect();
-        Topic[] topics = { new Topic("/integration-result", QoS.AT_LEAST_ONCE) 
};
-        byte[] qoses = connection.subscribe(topics);
-
-        try (LocalCluster cluster = new LocalCluster();
-             LocalTopology topo = cluster.submitTopology("test", new Config(), 
buildMqttTopology());) {
-
-            LOG.info("topology started");
-            while (!spoutActivated) {
-                Thread.sleep(500);
-            }
-
-            // publish a retained message to the broker
-            MqttOptions options = new MqttOptions();
-            options.setCleanConnection(false);
-            MqttPublisher publisher = new MqttPublisher(options, true);
-            publisher.connectMqtt("MqttPublisher");
-            publisher.publish(new MqttMessage(TEST_TOPIC, "test".getBytes()));
-
-            LOG.info("published message");
-
-            Message message = connection.receive();
-            LOG.info("Message recieved on topic: {}", message.getTopic());
-            LOG.info("Payload: {}", new String(message.getPayload()));
-            message.ack();
-
-            assertArrayEquals(message.getPayload(), RESULT_PAYLOAD.getBytes());
-            assertEquals(message.getTopic(), RESULT_TOPIC);
-        }
-    }
-
-    public StormTopology buildMqttTopology() {
-        TopologyBuilder builder = new TopologyBuilder();
-
-        MqttOptions options = new MqttOptions();
-        options.setTopics(Arrays.asList(TEST_TOPIC));
-        options.setCleanConnection(false);
-        TestSpout spout = new TestSpout(new StringMessageMapper(), options);
-
-        MqttBolt bolt = new MqttBolt(options, new MqttTupleMapper() {
-            @Override
-            public MqttMessage toMessage(ITuple tuple) {
-                LOG.info("Received: {}", tuple);
-                return new MqttMessage(RESULT_TOPIC, 
RESULT_PAYLOAD.getBytes());
-            }
-        });
-
-        builder.setSpout("mqtt-spout", spout);
-        builder.setBolt("mqtt-bolt", bolt).shuffleGrouping("mqtt-spout");
-
-        return builder.createTopology();
-    }
-
-    public static class TestSpout extends MqttSpout {
-        public TestSpout(MqttMessageMapper type, MqttOptions options) {
-            super(type, options);
-        }
-
-        @Override
-        public void activate() {
-            super.activate();
-            LOG.info("Spout activated.");
-            spoutActivated = true;
-        }
-    }
-
-}
diff --git a/pom.xml b/pom.xml
index 8d04950e1..32b49bed8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -495,7 +495,6 @@
                 <module>external/storm-elasticsearch</module>
                 <module>external/storm-solr</module>
                 <module>external/storm-metrics</module>
-                <module>external/storm-mqtt</module>
                 <module>external/storm-kafka-client</module>
                 <module>external/storm-kafka-migration</module>
                 <module>external/storm-opentsdb</module>
@@ -527,7 +526,6 @@
                 <module>examples/storm-hdfs-examples</module>
                 <module>examples/storm-hive-examples</module>
                 <module>examples/storm-elasticsearch-examples</module>
-                <module>examples/storm-mqtt-examples</module>
                 <module>examples/storm-pmml-examples</module>
                 <module>examples/storm-jms-examples</module>
                 <module>examples/storm-rocketmq-examples</module>

Reply via email to