This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch STORM-3988
in repository https://gitbox.apache.org/repos/asf/storm.git

commit e655c6d88ebbdfd13d4c9c23dee5a698751fd2b7
Author: Richard Zowalla <[email protected]>
AuthorDate: Thu Oct 19 09:00:22 2023 +0200

    STORM-3988 - Remove "storm-rocketmq"
---
 examples/storm-rocketmq-examples/pom.xml           |  96 ---------
 .../storm/rocketmq/topology/WordCountTopology.java |  89 --------
 .../storm/rocketmq/topology/WordCounter.java       |  68 ------
 .../storm/rocketmq/trident/WordCountTrident.java   |  89 --------
 external/storm-rocketmq/README.md                  | 150 --------------
 external/storm-rocketmq/pom.xml                    |  90 --------
 .../storm/rocketmq/ConsumerBatchMessage.java       |  65 ------
 .../rocketmq/DefaultMessageBodySerializer.java     |  38 ----
 .../storm/rocketmq/MessageBodySerializer.java      |  28 ---
 .../org/apache/storm/rocketmq/RocketMqConfig.java  | 178 ----------------
 .../org/apache/storm/rocketmq/RocketMqUtils.java   |  80 --------
 .../org/apache/storm/rocketmq/SpoutConfig.java     |  29 ---
 .../apache/storm/rocketmq/bolt/RocketMqBolt.java   | 216 -------------------
 .../mapper/FieldNameBasedTupleToMessageMapper.java |  70 -------
 .../common/mapper/TupleToMessageMapper.java        |  32 ---
 .../common/selector/DefaultTopicSelector.java      |  45 ----
 .../selector/FieldNameBasedTopicSelector.java      |  70 -------
 .../rocketmq/common/selector/TopicSelector.java    |  29 ---
 .../apache/storm/rocketmq/spout/RocketMqSpout.java | 228 ---------------------
 .../rocketmq/spout/scheme/KeyValueScheme.java      |  27 ---
 .../spout/scheme/StringKeyValueScheme.java         |  40 ----
 .../storm/rocketmq/spout/scheme/StringScheme.java  |  56 -----
 .../rocketmq/trident/state/RocketMqState.java      | 132 ------------
 .../trident/state/RocketMqStateFactory.java        |  43 ----
 .../trident/state/RocketMqStateUpdater.java        |  35 ----
 .../rocketmq/DefaultMessageBodySerializerTest.java |  36 ----
 .../java/org/apache/storm/rocketmq/TestUtils.java  |  45 ----
 .../storm/rocketmq/bolt/RocketMqBoltTest.java      |  67 ------
 .../FieldNameBasedTupleToMessageMapperTest.java    |  36 ----
 .../common/selector/DefaultTopicSelectorTest.java  |  40 ----
 .../selector/FieldNameBasedTopicSelectorTest.java  |  40 ----
 .../storm/rocketmq/spout/RocketMqSpoutTest.java    |  87 --------
 .../spout/scheme/StringKeyValueSchemeTest.java     |  63 ------
 pom.xml                                            |   2 -
 34 files changed, 2439 deletions(-)

diff --git a/examples/storm-rocketmq-examples/pom.xml 
b/examples/storm-rocketmq-examples/pom.xml
deleted file mode 100644
index c0ae8eb48..000000000
--- a/examples/storm-rocketmq-examples/pom.xml
+++ /dev/null
@@ -1,96 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>2.6.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
-
-    <artifactId>storm-rocketmq-examples</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-client</artifactId>
-            <version>${project.version}</version>
-            <scope>${provided.scope}</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-rocketmq</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <configuration>
-                    
<createDependencyReducedPom>true</createDependencyReducedPom>
-                    <filters>
-                        <filter>
-                            <artifact>*:*</artifact>
-                            <excludes>
-                                <exclude>META-INF/*.SF</exclude>
-                                <exclude>META-INF/*.sf</exclude>
-                                <exclude>META-INF/*.DSA</exclude>
-                                <exclude>META-INF/*.dsa</exclude>
-                                <exclude>META-INF/*.RSA</exclude>
-                                <exclude>META-INF/*.rsa</exclude>
-                                <exclude>META-INF/*.EC</exclude>
-                                <exclude>META-INF/*.ec</exclude>
-                                <exclude>META-INF/MSFTSIG.SF</exclude>
-                                <exclude>META-INF/MSFTSIG.RSA</exclude>
-                            </excludes>
-                        </filter>
-                    </filters>
-                </configuration>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <configuration>
-                            <transformers>
-                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
-                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                </transformer>
-                            </transformers>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-checkstyle-plugin</artifactId>
-                <!--Note - the version would be inherited-->
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-pmd-plugin</artifactId>
-            </plugin>
-        </plugins>
-    </build>
-</project>
diff --git 
a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
 
b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
deleted file mode 100644
index 3b40b51fc..000000000
--- 
a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.topology;
-
-import java.util.Properties;
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.rocketmq.RocketMqConfig;
-import org.apache.storm.rocketmq.SpoutConfig;
-import org.apache.storm.rocketmq.bolt.RocketMqBolt;
-import 
org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper;
-import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
-import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector;
-import org.apache.storm.rocketmq.common.selector.TopicSelector;
-import org.apache.storm.rocketmq.spout.RocketMqSpout;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-
-public class WordCountTopology {
-    private static final String WORD_SPOUT = "WORD_SPOUT";
-    private static final String COUNT_BOLT = "COUNT_BOLT";
-    private static final String INSERT_BOLT = "INSERT_BOLT";
-
-    private static final String CONSUMER_GROUP = "wordcount";
-    private static final String CONSUMER_TOPIC = "source";
-
-    public static StormTopology buildTopology(String nameserverAddr, String 
topic) {
-        Properties properties = new Properties();
-        properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, nameserverAddr);
-        properties.setProperty(SpoutConfig.CONSUMER_GROUP, CONSUMER_GROUP);
-        properties.setProperty(SpoutConfig.CONSUMER_TOPIC, CONSUMER_TOPIC);
-
-        RocketMqSpout spout = new RocketMqSpout(properties);
-
-        TupleToMessageMapper mapper = new 
FieldNameBasedTupleToMessageMapper("word", "count");
-        TopicSelector selector = new DefaultTopicSelector(topic);
-
-        properties = new Properties();
-        properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, 
nameserverAddr);
-
-        RocketMqBolt insertBolt = new RocketMqBolt()
-                .withMapper(mapper)
-                .withSelector(selector)
-                .withProperties(properties);
-
-        // wordSpout ==> countBolt ==> insertBolt
-        TopologyBuilder builder = new TopologyBuilder();
-
-        WordCounter bolt = new WordCounter();
-        builder.setSpout(WORD_SPOUT, spout, 1);
-        builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new 
Fields("str"));
-        builder.setBolt(INSERT_BOLT, insertBolt, 
1).shuffleGrouping(COUNT_BOLT);
-
-        return builder.createTopology();
-    }
-
-    public static void main(String[] args) throws Exception {
-        Config conf = new Config();
-        conf.setMaxSpoutPending(5);
-        conf.setNumWorkers(3);
-
-        String topologyName = "wordCounter";
-        if (args.length < 2) {
-            System.out.println("Usage: WordCountTopology <nameserver addr> 
<topic> [topology name]");
-        } else {
-            if (args.length > 3) {
-                topologyName = args[2];
-            }
-            StormSubmitter.submitTopology(topologyName, conf, 
buildTopology(args[0], args[1]));
-        }
-    }
-}
diff --git 
a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
 
b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
deleted file mode 100644
index dd6820036..000000000
--- 
a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.topology;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.IBasicBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-public class WordCounter implements IBasicBolt {
-    private Map<String, Integer> wordCounter = new HashMap<>();
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, TopologyContext context) 
{
-        
-    }
-
-    @Override
-    public void execute(Tuple input, BasicOutputCollector collector) {
-        String word = input.getStringByField("str");
-        int count;
-        if (wordCounter.containsKey(word)) {
-            count = wordCounter.get(word) + 1;
-        } else {
-            count = 1;
-        }
-        wordCounter.put(word, count);
-        collector.emit(new Values(word, count));
-    }
-
-    @Override
-    public void cleanup() {
-
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("word", "count"));
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-
-}
diff --git 
a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
 
b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
deleted file mode 100644
index dbdee9355..000000000
--- 
a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.trident;
-
-import java.util.Properties;
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.rocketmq.RocketMqConfig;
-import 
org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper;
-import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
-import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector;
-import org.apache.storm.rocketmq.common.selector.TopicSelector;
-import org.apache.storm.rocketmq.trident.state.RocketMqState;
-import org.apache.storm.rocketmq.trident.state.RocketMqStateFactory;
-import org.apache.storm.rocketmq.trident.state.RocketMqStateUpdater;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.testing.FixedBatchSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-public class WordCountTrident {
-
-    public static StormTopology buildTopology(String nameserverAddr, String 
topic) {
-        Fields fields = new Fields("word", "count");
-        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
-                new Values("storm", 1),
-                new Values("trident", 1),
-                new Values("needs", 1),
-                new Values("javadoc", 1)
-        );
-        spout.setCycle(true);
-
-        TupleToMessageMapper mapper = new 
FieldNameBasedTupleToMessageMapper("word", "count");
-        TopicSelector selector = new DefaultTopicSelector(topic);
-
-        Properties properties = new Properties();
-        properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, 
nameserverAddr);
-
-        RocketMqState.Options options = new RocketMqState.Options()
-                .withMapper(mapper)
-                .withSelector(selector)
-                .withProperties(properties);
-
-        StateFactory factory = new RocketMqStateFactory(options);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        stream.partitionPersist(factory, fields,
-                new RocketMqStateUpdater(), new Fields());
-
-        return topology.build();
-    }
-
-    public static void main(String[] args) throws Exception {
-        Config conf = new Config();
-        conf.setMaxSpoutPending(5);
-        conf.setNumWorkers(3);
-
-        String topologyName = "wordCounter";
-        if (args.length < 2) {
-            System.out.println("Usage: WordCountTrident <nameserver addr> 
<topic> [topology name]");
-        } else {
-            if (args.length > 3) {
-                topologyName = args[2];
-            }
-            StormSubmitter.submitTopology(topologyName, conf, 
buildTopology(args[0], args[1]));
-        }
-    }
-}
diff --git a/external/storm-rocketmq/README.md 
b/external/storm-rocketmq/README.md
deleted file mode 100644
index 4934cf0e8..000000000
--- a/external/storm-rocketmq/README.md
+++ /dev/null
@@ -1,150 +0,0 @@
-# Storm RocketMQ
-
-Storm/Trident integration for [RocketMQ](https://rocketmq.apache.org/). This 
package includes the core spout, bolt and trident states that allows a storm 
topology to either write storm tuples into a topic or read from topics in a 
storm topology.
-
-
-## Read from Topic
-The spout included in this package for reading data from a topic.
-
-### RocketMqSpout
-To use the `RocketMqSpout`,  you construct an instance of it by specifying a 
Properties instance which including rocketmq configs.
-RocketMqSpout uses RocketMQ MQPushConsumer as the default implementation. 
PushConsumer is a high level consumer API, wrapping the pulling details. Looks 
like broker push messages to consumer.
-RocketMqSpout's messages retrying depends on RocketMQ's push mode retry policy.
-
- ```java
-        Properties properties = new Properties();
-        properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, nameserverAddr);
-        properties.setProperty(SpoutConfig.CONSUMER_GROUP, group);
-        properties.setProperty(SpoutConfig.CONSUMER_TOPIC, topic);
-
-        RocketMqSpout spout = new RocketMqSpout(properties);
- ```
-
-
-## Write into Topic
-The bolt and trident state included in this package for write data into a 
topic.
-
-### TupleToMessageMapper
-The main API for mapping Storm tuple to a RocketMQ Message is the 
`org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper` interface:
-
-```java
-public interface TupleToMessageMapper extends Serializable {
-    String getKeyFromTuple(ITuple tuple);
-    byte[] getValueFromTuple(ITuple tuple);
-}
-```
-
-### FieldNameBasedTupleToMessageMapper
-`storm-rocketmq` includes a general purpose `TupleToMessageMapper` 
implementation called `FieldNameBasedTupleToMessageMapper`.
-
-### TopicSelector
-The main API for selecting topic and tags is the 
`org.apache.storm.rocketmq.common.selector.TopicSelector` interface:
-
-```java
-public interface TopicSelector extends Serializable {
-    String getTopic(ITuple tuple);
-    String getTag(ITuple tuple);
-}
-```
-
-### DefaultTopicSelector/FieldNameBasedTopicSelector
-`storm-rocketmq` includes general purpose `TopicSelector` implementations 
called `DefaultTopicSelector` and `FieldNameBasedTopicSelector`.
-
-
-### RocketMqBolt
-To use the `RocketMqBolt`, you construct an instance of it by specifying 
TupleToMessageMapper, TopicSelector and Properties instances.
-RocketMqBolt send messages async by default. You can change this by invoking 
`withAsync(false)`.
-
- ```java
-        TupleToMessageMapper mapper = new 
FieldNameBasedTupleToMessageMapper("word", "count");
-        TopicSelector selector = new DefaultTopicSelector(topic);
-
-        properties = new Properties();
-        properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, 
nameserverAddr);
-
-        RocketMqBolt insertBolt = new RocketMqBolt()
-                .withMapper(mapper)
-                .withSelector(selector)
-                .withProperties(properties);
- ```
-
-### Trident State
-We support trident persistent state that can be used with trident topologies. 
To create a RocketMQ persistent trident state you need to initialize it with 
the TupleToMessageMapper, TopicSelector, Properties instances. See the example 
below:
-
- ```java
-        TupleToMessageMapper mapper = new 
FieldNameBasedTupleToMessageMapper("word", "count");
-        TopicSelector selector = new DefaultTopicSelector(topic);
-
-        Properties properties = new Properties();
-        properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, 
nameserverAddr);
-
-        RocketMqState.Options options = new RocketMqState.Options()
-                .withMapper(mapper)
-                .withSelector(selector)
-                .withProperties(properties);
-
-        StateFactory factory = new RocketMqStateFactory(options);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        stream.partitionPersist(factory, fields,
-                new RocketMqStateUpdater(), new Fields());
- ```
-
-## Configurations
-
-### Producer Configurations
-| NAME        | DESCRIPTION           | DEFAULT  |
-| ------------- |:-------------:|:------:|
-| nameserver.address      | name server address *Required* | null |
-| nameserver.poll.interval      | name server poll topic info interval     |   
30000 |
-| brokerserver.heartbeat.interval | broker server heartbeat interval      |    
30000 |
-| producer.group | producer group      |    $UUID |
-| producer.retry.times | producer send messages retry times      |    3 |
-| producer.timeout | producer send messages timeout      |    3000 |
-
-
-### Consumer Configurations
-| NAME        | DESCRIPTION           | DEFAULT  |
-| ------------- |:-------------:|:------:|
-| nameserver.address      | name server address *Required* | null |
-| nameserver.poll.interval      | name server poll topic info interval     |   
30000 |
-| brokerserver.heartbeat.interval | broker server heartbeat interval      |    
30000 |
-| consumer.group | consumer group *Required*     |    null |
-| consumer.topic | consumer topic *Required*       |    null |
-| consumer.tag | consumer topic tag      |    * |
-| consumer.offset.reset.to | what to do when there is no initial offset on the 
server      |   latest/earliest/timestamp |
-| consumer.offset.from.timestamp | the timestamp when 
`consumer.offset.reset.to=timestamp` was set   |   $TIMESTAMP |
-| consumer.messages.orderly | if the consumer topic is ordered      |    false 
|
-| consumer.offset.persist.interval | auto commit offset interval      |    
5000 |
-| consumer.min.threads | consumer min threads      |    20 |
-| consumer.max.threads | consumer max threads      |    64 |
-| consumer.callback.executor.threads | client callback executor threads      | 
   $availableProcessors |
-| consumer.batch.size | consumer messages batch size      |    32 |
-| consumer.batch.process.timeout | consumer messages batch process timeout     
 |   $TOPOLOGY_MESSAGE_TIMEOUT_SECS + 10s|
-
-
-## License
-
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
-## Committer Sponsors
-
- * Xin Wang ([[email protected]](mailto:[email protected]))
-
diff --git a/external/storm-rocketmq/pom.xml b/external/storm-rocketmq/pom.xml
deleted file mode 100644
index 969ac1d30..000000000
--- a/external/storm-rocketmq/pom.xml
+++ /dev/null
@@ -1,90 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>2.6.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
-
-    <artifactId>storm-rocketmq</artifactId>
-    <name>storm-rocketmq</name>
-
-    <packaging>jar</packaging>
-
-    <developers>
-        <developer>
-            <id>vesense</id>
-            <name>Xin Wang</name>
-            <email>[email protected]</email>
-        </developer>
-    </developers>
-
-    <dependencies>
-        <!--parent module dependency-->
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-client</artifactId>
-            <version>${project.version}</version>
-            <scope>${provided.scope}</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-client</artifactId>
-            <version>${rocketmq.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-lang</groupId>
-            <artifactId>commons-lang</artifactId>
-        </dependency>
-        <!--test dependencies -->
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-server</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-checkstyle-plugin</artifactId>
-                <!--Note - the version would be inherited-->
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-pmd-plugin</artifactId>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerBatchMessage.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerBatchMessage.java
deleted file mode 100644
index c32a18a35..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerBatchMessage.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq;
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-public class ConsumerBatchMessage<T> {
-    private final List<T> data;
-    private CountDownLatch latch;
-    private boolean hasFailure = false;
-
-    public ConsumerBatchMessage(List<T> data) {
-        this.data = data;
-        latch = new CountDownLatch(data.size());
-    }
-
-    public boolean waitFinish(long timeout) throws InterruptedException {
-        return latch.await(timeout, TimeUnit.MILLISECONDS);
-    }
-
-    public boolean isSuccess() {
-        return !hasFailure;
-    }
-
-    public List<T> getData() {
-        return data;
-    }
-
-    /**
-     * Countdown if the sub message is successful.
-     */
-    public void ack() {
-        latch.countDown();
-    }
-
-    /**
-     * Countdown and fail-fast if the sub message is failed.
-     */
-    public void fail() {
-        hasFailure = true;
-        // fail fast
-        long count = latch.getCount();
-        for (int i = 0; i < count; i++) {
-            latch.countDown();
-        }
-    }
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
deleted file mode 100644
index 7cfaac6c0..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq;
-
-import java.nio.charset.StandardCharsets;
-
-public class DefaultMessageBodySerializer implements MessageBodySerializer {
-
-    /**
-     * Currently, we just convert string to bytes using UTF-8 charset.
-     * Note: in this way, object.toString() method is invoked.
-     * @param body RocketMQ Message body
-     * @return serialized byte array
-     */
-    @Override
-    public byte[] serialize(Object body) {
-        if (body == null) {
-            return null;
-        }
-        return body.toString().getBytes(StandardCharsets.UTF_8);
-    }
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
deleted file mode 100644
index fbf16cd6c..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq;
-
-import java.io.Serializable;
-
-/**
- * RocketMQ message body serializer.
- */
-public interface MessageBodySerializer extends Serializable {
-    byte[] serialize(Object body);
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqConfig.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqConfig.java
deleted file mode 100644
index 9a239beae..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqConfig.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq;
-
-import static org.apache.storm.rocketmq.RocketMqUtils.getInteger;
-
-import java.util.Properties;
-import java.util.UUID;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.Validate;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-
-/**
- * RocketMqConfig for Consumer/Producer.
- */
-public class RocketMqConfig {
-    // common
-    public static final String NAME_SERVER_ADDR = "nameserver.address"; // 
Required
-
-    public static final String NAME_SERVER_POLL_INTERVAL = 
"nameserver.poll.interval";
-    public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 
seconds
-
-    public static final String BROKER_HEART_BEAT_INTERVAL = 
"brokerserver.heartbeat.interval";
-    public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 
seconds
-
-
-    // producer
-    public static final String PRODUCER_GROUP = "producer.group";
-
-    public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";
-    public static final int DEFAULT_PRODUCER_RETRY_TIMES = 3;
-
-    public static final String PRODUCER_TIMEOUT = "producer.timeout";
-    public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
-
-
-    // consumer
-    public static final String CONSUMER_GROUP = "consumer.group"; // Required
-
-    public static final String CONSUMER_TOPIC = "consumer.topic"; // Required
-
-    public static final String CONSUMER_TAG = "consumer.tag";
-    public static final String DEFAULT_CONSUMER_TAG = "*";
-
-    public static final String CONSUMER_OFFSET_RESET_TO = 
"consumer.offset.reset.to";
-    public static final String CONSUMER_OFFSET_LATEST = "latest";
-    public static final String CONSUMER_OFFSET_EARLIEST = "earliest";
-    public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";
-    public static final String CONSUMER_OFFSET_FROM_TIMESTAMP = 
"consumer.offset.from.timestamp";
-
-    public static final String CONSUMER_MESSAGES_ORDERLY = 
"consumer.messages.orderly";
-
-    public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = 
"consumer.offset.persist.interval";
-    public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; 
// 5 seconds
-
-    public static final String CONSUMER_MIN_THREADS = "consumer.min.threads";
-    public static final int DEFAULT_CONSUMER_MIN_THREADS = 20;
-
-    public static final String CONSUMER_MAX_THREADS = "consumer.max.threads";
-    public static final int DEFAULT_CONSUMER_MAX_THREADS = 64;
-
-    public static final String CONSUMER_CALLBACK_EXECUTOR_THREADS = 
"consumer.callback.executor.threads";
-    public static final int DEFAULT_CONSUMER_CALLBACK_EXECUTOR_THREADS = 
Runtime.getRuntime().availableProcessors();
-
-    public static final String CONSUMER_BATCH_SIZE = "consumer.batch.size";
-    public static final int DEFAULT_CONSUMER_BATCH_SIZE = 32;
-
-    public static final String CONSUMER_BATCH_PROCESS_TIMEOUT = 
"consumer.batch.process.timeout";
-
-    /**
-     * Build Producer Configs.
-     * @param props Properties
-     * @param producer DefaultMQProducer
-     */
-    public static void buildProducerConfigs(Properties props, 
DefaultMQProducer producer) {
-        buildCommonConfigs(props, producer);
-
-        String group = props.getProperty(PRODUCER_GROUP);
-        if (StringUtils.isEmpty(group)) {
-            group = UUID.randomUUID().toString();
-        }
-        producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, group));
-
-        producer.setRetryTimesWhenSendFailed(getInteger(props,
-                PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
-        producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,
-                PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
-        producer.setSendMsgTimeout(getInteger(props,
-                PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
-    }
-
-    /**
-     * Build Consumer Configs.
-     * @param props Properties
-     * @param consumer DefaultMQPushConsumer
-     */
-    public static void buildConsumerConfigs(Properties props, 
DefaultMQPushConsumer consumer) {
-        buildCommonConfigs(props, consumer);
-
-        String group = props.getProperty(CONSUMER_GROUP);
-        Validate.notEmpty(group);
-        consumer.setConsumerGroup(group);
-
-        consumer.setPullBatchSize(getInteger(props,
-            CONSUMER_BATCH_SIZE, DEFAULT_CONSUMER_BATCH_SIZE));
-
-        consumer.setPersistConsumerOffsetInterval(getInteger(props,
-                CONSUMER_OFFSET_PERSIST_INTERVAL, 
DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
-        consumer.setConsumeThreadMin(getInteger(props,
-                CONSUMER_MIN_THREADS, DEFAULT_CONSUMER_MIN_THREADS));
-        consumer.setConsumeThreadMax(getInteger(props,
-                CONSUMER_MAX_THREADS, DEFAULT_CONSUMER_MAX_THREADS));
-
-        consumer.setClientCallbackExecutorThreads(getInteger(props,
-            CONSUMER_CALLBACK_EXECUTOR_THREADS, 
DEFAULT_CONSUMER_CALLBACK_EXECUTOR_THREADS));
-
-        String initOffset = props.getProperty(CONSUMER_OFFSET_RESET_TO, 
CONSUMER_OFFSET_LATEST);
-        switch (initOffset) {
-            case CONSUMER_OFFSET_EARLIEST:
-                
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-                break;
-            case CONSUMER_OFFSET_LATEST:
-                
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
-                break;
-            case CONSUMER_OFFSET_TIMESTAMP:
-                String timestamp = 
props.getProperty(CONSUMER_OFFSET_FROM_TIMESTAMP);
-                consumer.setConsumeTimestamp(timestamp);
-                break;
-            default:
-                
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
-        }
-
-        String topic = props.getProperty(CONSUMER_TOPIC);
-        Validate.notEmpty(topic);
-        try {
-            consumer.subscribe(topic, props.getProperty(CONSUMER_TAG, 
DEFAULT_CONSUMER_TAG));
-        } catch (MQClientException e) {
-            throw new IllegalArgumentException(e);
-        }
-    }
-
-    /**
-     * Build Common Configs.
-     * @param props Properties
-     * @param client ClientConfig
-     */
-    public static void buildCommonConfigs(Properties props, ClientConfig 
client) {
-        String nameServers = props.getProperty(NAME_SERVER_ADDR);
-        Validate.notEmpty(nameServers);
-        client.setNamesrvAddr(nameServers);
-
-        client.setPollNameServerInterval(getInteger(props,
-                NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL));
-        client.setHeartbeatBrokerInterval(getInteger(props,
-                BROKER_HEART_BEAT_INTERVAL, 
DEFAULT_BROKER_HEART_BEAT_INTERVAL));
-    }
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqUtils.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqUtils.java
deleted file mode 100644
index a2d7e7a6d..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqUtils.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.rocketmq.common.message.Message;
-import org.apache.storm.rocketmq.spout.scheme.KeyValueScheme;
-import org.apache.storm.spout.Scheme;
-
-public final class RocketMqUtils {
-
-    public static int getInteger(Properties props, String key, int 
defaultValue) {
-        return Integer.parseInt(props.getProperty(key, 
String.valueOf(defaultValue)));
-    }
-
-    public static long getLong(Properties props, String key, long 
defaultValue) {
-        return Long.parseLong(props.getProperty(key, 
String.valueOf(defaultValue)));
-    }
-
-    public static boolean getBoolean(Properties props, String key, boolean 
defaultValue) {
-        return Boolean.parseBoolean(props.getProperty(key, 
String.valueOf(defaultValue)));
-    }
-
-    /**
-     * Create Scheme by Properties.
-     * @param props Properties
-     * @return Scheme
-     */
-    public static Scheme createScheme(Properties props) {
-        String schemeString = props.getProperty(SpoutConfig.SCHEME, 
SpoutConfig.DEFAULT_SCHEME);
-        Scheme scheme;
-        try {
-            Class clazz = Class.forName(schemeString);
-            scheme = (Scheme) clazz.newInstance();
-        } catch (Exception e) {
-            throw new IllegalArgumentException("Cannot create Scheme for " + 
schemeString
-                    + " due to " + e.getMessage());
-        }
-        return scheme;
-    }
-
-    /**
-     * Generate Storm tuple values by Message and Scheme.
-     * @param msg RocketMQ Message
-     * @param scheme Scheme for deserializing
-     * @return tuple values
-     */
-    public static List<Object> generateTuples(Message msg, Scheme scheme) {
-        List<Object> tup;
-        String rawKey = msg.getKeys();
-        ByteBuffer body = ByteBuffer.wrap(msg.getBody());
-        if (rawKey != null && scheme instanceof KeyValueScheme) {
-            ByteBuffer key = 
ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8));
-            tup = ((KeyValueScheme) scheme).deserializeKeyAndValue(key, body);
-        } else {
-            tup = scheme.deserialize(body);
-        }
-        return tup;
-    }
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
deleted file mode 100644
index 83e956d53..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq;
-
-import org.apache.storm.rocketmq.spout.scheme.StringScheme;
-
-public class SpoutConfig extends RocketMqConfig {
-
-    public static final String SCHEME = "spout.scheme";
-    public static final String DEFAULT_SCHEME = StringScheme.class.getName();
-
-
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMqBolt.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMqBolt.java
deleted file mode 100644
index 4cfa9f1d9..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMqBolt.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.bolt;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import org.apache.commons.lang.Validate;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.storm.Config;
-import org.apache.storm.rocketmq.RocketMqConfig;
-import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
-import org.apache.storm.rocketmq.common.selector.TopicSelector;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.BatchHelper;
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RocketMqBolt implements IRichBolt {
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOG = 
LoggerFactory.getLogger(RocketMqBolt.class);
-
-    private static final int DEFAULT_FLUSH_INTERVAL_SECS = 5;
-    private static final int DEFAULT_BATCH_SIZE = 20;
-
-    private DefaultMQProducer producer;
-    private OutputCollector collector;
-    private TopicSelector selector;
-    private TupleToMessageMapper mapper;
-    private Properties properties;
-
-    private boolean async = true;
-    private boolean batch = false;
-    private int batchSize = DEFAULT_BATCH_SIZE;
-    private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
-    private BatchHelper batchHelper;
-    private List<Message> messages;
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, TopologyContext context, 
OutputCollector collector) {
-        Validate.notEmpty(properties, "Producer properties can not be empty");
-        Validate.notNull(selector, "TopicSelector can not be null");
-        Validate.notNull(mapper, "TupleToMessageMapper can not be null");
-
-        producer = new DefaultMQProducer();
-        producer.setInstanceName(String.valueOf(context.getThisTaskId()));
-        RocketMqConfig.buildProducerConfigs(properties, producer);
-
-        try {
-            producer.start();
-        } catch (MQClientException e) {
-            throw new RuntimeException(e);
-        }
-
-        this.collector = collector;
-        this.batchHelper = new BatchHelper(batchSize, collector);
-        this.messages = new LinkedList<>();
-    }
-
-    public RocketMqBolt withSelector(TopicSelector selector) {
-        this.selector = selector;
-        return this;
-    }
-
-    public RocketMqBolt withMapper(TupleToMessageMapper mapper) {
-        this.mapper = mapper;
-        return this;
-    }
-
-    public RocketMqBolt withAsync(boolean async) {
-        this.async = async;
-        return this;
-    }
-
-    public RocketMqBolt withBatch(boolean batch) {
-        this.batch = batch;
-        return this;
-    }
-
-    public RocketMqBolt withBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-        return this;
-    }
-
-    public RocketMqBolt withFlushIntervalSecs(int flushIntervalSecs) {
-        this.flushIntervalSecs = flushIntervalSecs;
-        return this;
-    }
-
-    public RocketMqBolt withProperties(Properties properties) {
-        this.properties = properties;
-        return this;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        if (!batch && TupleUtils.isTick(input)) {
-            return;
-        }
-
-        String topic = selector.getTopic(input);
-        if (topic == null) {
-            LOG.warn("skipping Message due to topic selector returned null.");
-            collector.ack(input);
-            return;
-        }
-
-        if (batch) {
-            // batch sync sending
-            try {
-                if (batchHelper.shouldHandle(input)) {
-                    batchHelper.addBatch(input);
-                    messages.add(prepareMessage(input));
-                }
-
-                if (batchHelper.shouldFlush()) {
-                    producer.send(messages);
-                    batchHelper.ack();
-                    messages.clear();
-                }
-            } catch (Exception e) {
-                LOG.error("Batch send messages failure!", e);
-                batchHelper.fail(e);
-                messages.clear();
-            }
-        } else {
-            if (async) {
-                // async sending
-                try {
-                    producer.send(prepareMessage(input), new SendCallback() {
-                        @Override
-                        public void onSuccess(SendResult sendResult) {
-                            collector.ack(input);
-                        }
-
-                        @Override
-                        public void onException(Throwable throwable) {
-                            if (throwable != null) {
-                                LOG.error("Async send messages failure!", 
throwable);
-                                collector.reportError(throwable);
-                                collector.fail(input);
-                            }
-                        }
-                    });
-                } catch (Exception e) {
-                    LOG.error("Async send messages failure!", e);
-                    collector.reportError(e);
-                    collector.fail(input);
-                }
-            } else {
-                // sync sending, will return a SendResult
-                try {
-                    producer.send(prepareMessage(input));
-                    collector.ack(input);
-                } catch (Exception e) {
-                    LOG.error("Sync send messages failure!", e);
-                    collector.reportError(e);
-                    collector.fail(input);
-                }
-            }
-        }
-    }
-
-    // Mapping: from storm tuple -> rocketmq Message
-    private Message prepareMessage(Tuple input) {
-        String topic = selector.getTopic(input);
-        String tag = selector.getTag(input);
-        String key = mapper.getKeyFromTuple(input);
-        byte[] value = mapper.getValueFromTuple(input);
-
-        return new Message(topic, tag, key, value);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return TupleUtils.putTickFrequencyIntoComponentConfig(new Config(), 
flushIntervalSecs);
-    }
-
-    @Override
-    public void cleanup() {
-        if (producer != null) {
-            producer.shutdown();
-        }
-    }
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
deleted file mode 100644
index a38a36539..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.common.mapper;
-
-import org.apache.storm.rocketmq.DefaultMessageBodySerializer;
-import org.apache.storm.rocketmq.MessageBodySerializer;
-import org.apache.storm.tuple.ITuple;
-
-public class FieldNameBasedTupleToMessageMapper implements 
TupleToMessageMapper {
-    public static final String BOLT_KEY = "key";
-    public static final String BOLT_MESSAGE = "message";
-    public String boltKeyField;
-    public String boltMessageField;
-    private MessageBodySerializer messageBodySerializer;
-
-    public FieldNameBasedTupleToMessageMapper() {
-        this(BOLT_KEY, BOLT_MESSAGE);
-    }
-
-    /**
-     * FieldNameBasedTupleToMessageMapper Constructor.
-     * @param boltKeyField tuple field for selecting the key
-     * @param boltMessageField  tuple field for selecting the value
-     */
-    public FieldNameBasedTupleToMessageMapper(String boltKeyField, String 
boltMessageField) {
-        this.boltKeyField = boltKeyField;
-        this.boltMessageField = boltMessageField;
-        this.messageBodySerializer = new DefaultMessageBodySerializer();
-    }
-
-    @Override
-    public String getKeyFromTuple(ITuple tuple) {
-        return tuple.getStringByField(boltKeyField);
-    }
-
-    @Override
-    public byte[] getValueFromTuple(ITuple tuple) {
-        Object obj = tuple.getValueByField(boltMessageField);
-        if (obj == null) {
-            return null;
-        }
-        return messageBodySerializer.serialize(obj);
-    }
-
-    /**
-     * using this method can override the default  MessageBodySerializer.
-     * @param serializer MessageBodySerializer
-     * @return this object
-     */
-    public FieldNameBasedTupleToMessageMapper 
withMessageBodySerializer(MessageBodySerializer serializer) {
-        this.messageBodySerializer = serializer;
-        return this;
-    }
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
deleted file mode 100644
index ef716a4c4..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.common.mapper;
-
-import java.io.Serializable;
-import org.apache.storm.tuple.ITuple;
-
-/**
- * Interface defining a mapping from storm tuple to rocketmq key and message.
- */
-public interface TupleToMessageMapper extends Serializable {
-
-    String getKeyFromTuple(ITuple tuple);
-
-    byte[] getValueFromTuple(ITuple tuple);
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
deleted file mode 100644
index e0192da68..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.common.selector;
-
-import org.apache.storm.tuple.ITuple;
-
-public class DefaultTopicSelector implements TopicSelector {
-    private final String topicName;
-    private final String tagName;
-
-    public DefaultTopicSelector(final String topicName, final String tagName) {
-        this.topicName = topicName;
-        this.tagName = tagName;
-    }
-
-    public DefaultTopicSelector(final String topicName) {
-        this(topicName, "");
-    }
-
-    @Override
-    public String getTopic(ITuple tuple) {
-        return topicName;
-    }
-
-    @Override
-    public String getTag(ITuple tuple) {
-        return tagName;
-    }
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
deleted file mode 100644
index 253221ee5..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.common.selector;
-
-import org.apache.storm.tuple.ITuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Uses field name to select topic and tag name from tuple.
- */
-public class FieldNameBasedTopicSelector implements TopicSelector {
-    private static final Logger LOG = 
LoggerFactory.getLogger(FieldNameBasedTopicSelector.class);
-
-    private final String topicFieldName;
-    private final String defaultTopicName;
-
-    private final String tagFieldName;
-    private final String defaultTagName;
-
-    /**
-     * FieldNameBasedTopicSelector Constructor.
-     * @param topicFieldName field name used for selecting topic
-     * @param defaultTopicName default field name used for selecting topic
-     * @param tagFieldName field name used for selecting tag
-     * @param defaultTagName default field name used for selecting tag
-     */
-    public FieldNameBasedTopicSelector(String topicFieldName, String 
defaultTopicName, String tagFieldName, String defaultTagName) {
-        this.topicFieldName = topicFieldName;
-        this.defaultTopicName = defaultTopicName;
-        this.tagFieldName = tagFieldName;
-        this.defaultTagName = defaultTagName;
-    }
-
-    @Override
-    public String getTopic(ITuple tuple) {
-        if (tuple.contains(topicFieldName)) {
-            return tuple.getStringByField(topicFieldName);
-        } else {
-            LOG.warn("Field {} Not Found. Returning default topic {}", 
topicFieldName, defaultTopicName);
-            return defaultTopicName;
-        }
-    }
-
-    @Override
-    public String getTag(ITuple tuple) {
-        if (tuple.contains(tagFieldName)) {
-            return tuple.getStringByField(tagFieldName);
-        } else {
-            LOG.warn("Field {} Not Found. Returning default tag {}", 
tagFieldName, defaultTagName);
-            return defaultTagName;
-        }
-    }
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
deleted file mode 100644
index 2940920fe..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.common.selector;
-
-import java.io.Serializable;
-import org.apache.storm.tuple.ITuple;
-
-public interface TopicSelector extends Serializable {
-
-    String getTopic(ITuple tuple);
-
-    String getTag(ITuple tuple);
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java
deleted file mode 100644
index c08dd6263..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.spout;
-
-import static org.apache.storm.rocketmq.RocketMqUtils.getBoolean;
-import static org.apache.storm.rocketmq.RocketMqUtils.getLong;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.commons.lang.Validate;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
-import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.storm.Config;
-import org.apache.storm.rocketmq.ConsumerBatchMessage;
-import org.apache.storm.rocketmq.RocketMqConfig;
-import org.apache.storm.rocketmq.RocketMqUtils;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * RocketMqSpout uses MQPushConsumer as the default implementation.
- * PushConsumer is a high level consumer API, wrapping the pulling details
- * Looks like broker push messages to consumer
- */
-public class RocketMqSpout implements IRichSpout {
-    // TODO add metrics
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOG = 
LoggerFactory.getLogger(RocketMqSpout.class);
-
-    private DefaultMQPushConsumer consumer;
-    private SpoutOutputCollector collector;
-    private BlockingQueue<ConsumerBatchMessage<List<Object>>> queue;
-    private Map<String, ConsumerBatchMessage<List<Object>>> cache;
-
-    private Properties properties;
-    private Scheme scheme;
-    private long batchProcessTimeout;
-
-    /**
-     * RocketMqSpout Constructor.
-     * @param properties Properties Config
-     */
-    public RocketMqSpout(Properties properties) {
-        Validate.notEmpty(properties, "Consumer properties can not be empty");
-        this.properties = properties;
-        scheme = RocketMqUtils.createScheme(properties);
-    }
-
-    @Override
-    public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
-        consumer = new DefaultMQPushConsumer();
-        consumer.setInstanceName(String.valueOf(context.getThisTaskId()));
-        RocketMqConfig.buildConsumerConfigs(properties, consumer);
-
-        boolean ordered = getBoolean(properties, 
RocketMqConfig.CONSUMER_MESSAGES_ORDERLY, false);
-        if (ordered) {
-            consumer.registerMessageListener(new MessageListenerOrderly() {
-                @Override
-                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> 
msgs,
-                                                           
ConsumeOrderlyContext context) {
-                    if (process(msgs)) {
-                        return ConsumeOrderlyStatus.SUCCESS;
-                    } else {
-                        return 
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
-                    }
-                }
-            });
-        } else {
-            consumer.registerMessageListener(new MessageListenerConcurrently() 
{
-                @Override
-                public ConsumeConcurrentlyStatus 
consumeMessage(List<MessageExt> msgs,
-                                                                
ConsumeConcurrentlyContext context) {
-                    if (process(msgs)) {
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                    } else {
-                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
-                    }
-                }
-            });
-        }
-
-        try {
-            consumer.start();
-        } catch (MQClientException e) {
-            LOG.error("Failed to start RocketMQ consumer.", e);
-            throw new RuntimeException(e);
-        }
-
-        long defaultBatchProcessTimeout = (long) 
conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 30) * 1000 + 10000;
-        batchProcessTimeout = getLong(properties, 
RocketMqConfig.CONSUMER_BATCH_PROCESS_TIMEOUT, defaultBatchProcessTimeout);
-
-        queue = new LinkedBlockingQueue<>();
-        cache = new ConcurrentHashMap<>();
-        this.collector = collector;
-    }
-
-    /**
-     * Process pushed messages.
-     * @param msgs messages
-     * @return the boolean flag processed result
-     */
-    protected boolean process(List<MessageExt> msgs) {
-        if (msgs.isEmpty()) {
-            return true;
-        }
-
-        List<List<Object>> list = new ArrayList<>(msgs.size());
-        for (MessageExt msg : msgs) {
-            List<Object> data = RocketMqUtils.generateTuples(msg, scheme);
-            if (data != null) {
-                list.add(data);
-            }
-        }
-        ConsumerBatchMessage<List<Object>> batchMessage = new 
ConsumerBatchMessage<>(list);
-        try {
-            queue.put(batchMessage);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-
-        boolean isCompleted;
-        try {
-            isCompleted = batchMessage.waitFinish(batchProcessTimeout);
-        } catch (InterruptedException e) {
-            LOG.error("Interrupted when waiting messages to be finished.", e);
-            throw new RuntimeException(e);
-        }
-
-        boolean isSuccess = batchMessage.isSuccess();
-
-        return isCompleted && isSuccess;
-    }
-
-    @Override
-    public void nextTuple() {
-        ConsumerBatchMessage<List<Object>> batchMessage = queue.poll();
-        if (batchMessage != null) {
-            List<List<Object>> list = batchMessage.getData();
-            for (List<Object> data : list) {
-                String messageId = UUID.randomUUID().toString();
-                cache.put(messageId, batchMessage);
-                collector.emit(data, messageId);
-            }
-        }
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        ConsumerBatchMessage batchMessage = cache.get(msgId);
-        batchMessage.ack();
-        cache.remove(msgId);
-        LOG.debug("Message acked {}", batchMessage);
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        ConsumerBatchMessage batchMessage = cache.get(msgId);
-        batchMessage.fail();
-        cache.remove(msgId);
-        LOG.debug("Message failed {}", batchMessage);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(scheme.getOutputFields());
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-
-    @Override
-    public void close() {
-        if (consumer != null) {
-            consumer.shutdown();
-        }
-    }
-
-    @Override
-    public void activate() {
-        if (consumer != null) {
-            consumer.resume();
-        }
-    }
-
-    @Override
-    public void deactivate() {
-        if (consumer != null) {
-            consumer.suspend();
-        }
-    }
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
deleted file mode 100644
index f4cf803bb..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.spout.scheme;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.storm.spout.Scheme;
-
-public interface KeyValueScheme extends Scheme {
-    List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value);
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
deleted file mode 100644
index 21a762e1f..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.spout.scheme;
-
-import com.google.common.collect.ImmutableMap;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.storm.tuple.Values;
-
-public class StringKeyValueScheme extends StringScheme implements 
KeyValueScheme {
-
-    @Override
-    public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer 
value) {
-        if (key == null) {
-            return deserialize(value);
-        }
-        String keyString = StringScheme.deserializeString(key);
-        String valueString = StringScheme.deserializeString(value);
-        return new Values(ImmutableMap.of(keyString, valueString));
-    }
-
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
deleted file mode 100644
index 53fdc57aa..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.spout.scheme;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-public class StringScheme implements Scheme {
-    public static final String STRING_SCHEME_KEY = "str";
-
-    @Override
-    public List<Object> deserialize(ByteBuffer bytes) {
-        return new Values(deserializeString(bytes));
-    }
-
-    /**
-     * Deserialize ByteBuffer to String.
-     * @param byteBuffer input ByteBuffer
-     * @return deserialized string
-     */
-    public static String deserializeString(ByteBuffer byteBuffer) {
-        if (byteBuffer.hasArray()) {
-            int base = byteBuffer.arrayOffset();
-            return new String(byteBuffer.array(), base + 
byteBuffer.position(), byteBuffer.remaining(),
-                StandardCharsets.UTF_8);
-        } else {
-            return new String(Utils.toByteArray(byteBuffer), 
StandardCharsets.UTF_8);
-        }
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return new Fields(STRING_SCHEME_KEY);
-    }
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
deleted file mode 100644
index c42bd4711..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.trident.state;
-
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-
-import org.apache.commons.lang.Validate;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.storm.rocketmq.RocketMqConfig;
-import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
-import org.apache.storm.rocketmq.common.selector.TopicSelector;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RocketMqState implements State {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(RocketMqState.class);
-
-    private Options options;
-    private DefaultMQProducer producer;
-
-    protected RocketMqState(Map<String, Object> map, Options options) {
-        this.options = options;
-    }
-
-    public static class Options implements Serializable {
-        private TopicSelector selector;
-        private TupleToMessageMapper mapper;
-        private Properties properties;
-
-        public Options withSelector(TopicSelector selector) {
-            this.selector = selector;
-            return this;
-        }
-
-        public Options withMapper(TupleToMessageMapper mapper) {
-            this.mapper = mapper;
-            return this;
-        }
-
-        public Options withProperties(Properties properties) {
-            this.properties = properties;
-            return this;
-        }
-    }
-
-    protected void prepare() {
-        Validate.notEmpty(options.properties, "Producer properties can not be 
empty");
-        Validate.notNull(options.selector, "TopicSelector can not be null");
-        Validate.notNull(options.mapper, "TupleToMessageMapper can not be 
null");
-
-        producer = new DefaultMQProducer();
-        producer.setInstanceName(UUID.randomUUID().toString());
-        RocketMqConfig.buildProducerConfigs(options.properties, producer);
-
-        try {
-            producer.start();
-        } catch (MQClientException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void beginCommit(Long txid) {
-        LOG.debug("beginCommit is noop.");
-    }
-
-    @Override
-    public void commit(Long txid) {
-        LOG.debug("commit is noop.");
-    }
-
-    /**
-     * Update the RocketMQ state.
-     * @param tuples trident tuples
-     * @param collector trident collector
-     */
-    public void updateState(List<TridentTuple> tuples, TridentCollector 
collector) {
-        List<Message> messages = new LinkedList<>();
-
-        for (TridentTuple tuple : tuples) {
-            String topic = options.selector.getTopic(tuple);
-            String tag = options.selector.getTag(tuple);
-            String key = options.mapper.getKeyFromTuple(tuple);
-            byte[] value = options.mapper.getValueFromTuple(tuple);
-
-            if (topic == null) {
-                LOG.warn("skipping Message with Key = " + key + ", topic 
selector returned null.");
-                continue;
-            }
-
-            Message msg = new Message(topic, tag, key, value);
-            messages.add(msg);
-        }
-
-        try {
-            this.producer.send(messages);
-        } catch (Exception e) {
-            LOG.warn("Batch write failed. Triggering replay.", e);
-            collector.reportError(e);
-            throw new FailedException(e);
-        }
-    }
-
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateFactory.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateFactory.java
deleted file mode 100644
index 64b8277f4..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.trident.state;
-
-import java.util.Map;
-
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-public class RocketMqStateFactory implements StateFactory {
-
-    private RocketMqState.Options options;
-
-    public RocketMqStateFactory(RocketMqState.Options options) {
-        this.options = options;
-    }
-
-    @Override
-    public State makeState(Map<String, Object> conf, IMetricsContext metrics,
-            int partitionIndex, int numPartitions) {
-        RocketMqState state = new RocketMqState(conf, options);
-        state.prepare();
-        return state;
-    }
-
-}
diff --git 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateUpdater.java
 
b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateUpdater.java
deleted file mode 100644
index 589ac5e3f..000000000
--- 
a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateUpdater.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.trident.state;
-
-import java.util.List;
-
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public class RocketMqStateUpdater extends BaseStateUpdater<RocketMqState>  {
-
-    @Override
-    public void updateState(RocketMqState state, List<TridentTuple> tuples,
-                            TridentCollector collector) {
-        state.updateState(tuples, collector);
-    }
-
-}
diff --git 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/DefaultMessageBodySerializerTest.java
 
b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/DefaultMessageBodySerializerTest.java
deleted file mode 100644
index dd8ed06f3..000000000
--- 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/DefaultMessageBodySerializerTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq;
-
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-
-public class DefaultMessageBodySerializerTest {
-    @Test
-    public void serialize() {
-        DefaultMessageBodySerializer messageBodySerializer = new 
DefaultMessageBodySerializer();
-        String body = "this is message body data";
-        assertArrayEquals(body.getBytes(), 
messageBodySerializer.serialize(body));
-
-        body = null;
-        assertArrayEquals(null, messageBodySerializer.serialize(body));
-    }
-
-}
\ No newline at end of file
diff --git 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestUtils.java
 
b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestUtils.java
deleted file mode 100644
index 399445594..000000000
--- 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestUtils.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq;
-
-import java.lang.reflect.Field;
-import java.util.Arrays;
-
-import org.apache.storm.Testing;
-import org.apache.storm.testing.MkTupleParam;
-import org.apache.storm.tuple.Tuple;
-
-public class TestUtils {
-    public static void setFieldValue(Object obj, String fieldName, Object 
value) {
-        try {
-            Field field = obj.getClass().getDeclaredField(fieldName);
-            field.setAccessible(true);
-            field.set(obj, value);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    public static Tuple generateTestTuple(String field1, String field2, Object 
value1, Object value2) {
-        MkTupleParam param = new MkTupleParam();
-        param.setFields(field1, field2);
-        Tuple testTuple = Testing.testTuple(Arrays.asList(value1, value2), 
param);
-        return testTuple;
-    }
-}
diff --git 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/bolt/RocketMqBoltTest.java
 
b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/bolt/RocketMqBoltTest.java
deleted file mode 100644
index 76b016764..000000000
--- 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/bolt/RocketMqBoltTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.bolt;
-
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.storm.rocketmq.TestUtils;
-import 
org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper;
-import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector;
-import org.apache.storm.tuple.Tuple;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-public class RocketMqBoltTest {
-
-    private RocketMqBolt rocketMqBolt;
-    private DefaultMQProducer producer;
-
-    @BeforeEach
-    public void setUp() {
-        rocketMqBolt = new RocketMqBolt();
-        rocketMqBolt.withSelector(new DefaultTopicSelector("tpc"));
-        rocketMqBolt.withMapper(new FieldNameBasedTupleToMessageMapper("f1", 
"f2"));
-        rocketMqBolt.withBatch(false);
-        rocketMqBolt.withBatchSize(5);
-        rocketMqBolt.withAsync(true);
-        rocketMqBolt.withFlushIntervalSecs(5);
-
-        producer = mock(DefaultMQProducer.class);
-        TestUtils.setFieldValue(rocketMqBolt, "producer", producer);
-    }
-
-    @Test
-    public void execute() throws Exception {
-        Tuple tuple = TestUtils.generateTestTuple("f1", "f2", "v1", "v2");
-        rocketMqBolt.execute(tuple);
-
-        verify(producer).send(any(Message.class), any(SendCallback.class));
-    }
-
-    @Test
-    public void cleanup() {
-        rocketMqBolt.cleanup();
-        verify(producer).shutdown();
-    }
-}
\ No newline at end of file
diff --git 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapperTest.java
 
b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapperTest.java
deleted file mode 100644
index 885eb5ca9..000000000
--- 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapperTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.common.mapper;
-
-import org.apache.storm.rocketmq.TestUtils;
-import org.apache.storm.tuple.Tuple;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class FieldNameBasedTupleToMessageMapperTest {
-    @Test
-    public void withMessageBodySerializer() throws Exception {
-        FieldNameBasedTupleToMessageMapper messageMapper = new 
FieldNameBasedTupleToMessageMapper("f1", "f2");
-        Tuple tuple = TestUtils.generateTestTuple("f1", "f2", "v1", "v2");
-        assertEquals("v1", messageMapper.getKeyFromTuple(tuple));
-        assertArrayEquals("v2".getBytes(), 
messageMapper.getValueFromTuple(tuple));
-    }
-}
\ No newline at end of file
diff --git 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelectorTest.java
 
b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelectorTest.java
deleted file mode 100644
index 841d5cd9f..000000000
--- 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelectorTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.common.selector;
-
-import org.apache.storm.tuple.Tuple;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class DefaultTopicSelectorTest {
-
-    @Test
-    public void getTopic() {
-        DefaultTopicSelector topicSelector = new DefaultTopicSelector("tpc", 
"tg");
-        Tuple tuple = null;
-        assertEquals("tpc", topicSelector.getTopic(tuple));
-        assertEquals("tg", topicSelector.getTag(tuple));
-
-        topicSelector = new DefaultTopicSelector("tpc2");
-        assertEquals("tpc2", topicSelector.getTopic(tuple));
-        assertEquals("", topicSelector.getTag(tuple));
-    }
-
-}
\ No newline at end of file
diff --git 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelectorTest.java
 
b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelectorTest.java
deleted file mode 100644
index 0d2365688..000000000
--- 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelectorTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.common.selector;
-
-import org.apache.storm.rocketmq.TestUtils;
-import org.apache.storm.tuple.Tuple;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class FieldNameBasedTopicSelectorTest {
-    @Test
-    public void getTopic() {
-        FieldNameBasedTopicSelector topicSelector = new 
FieldNameBasedTopicSelector("f1", "tpc", "f2",  "tg");
-        Tuple tuple = TestUtils.generateTestTuple("f1", "fn", "v1", "vn");
-        assertEquals("v1", topicSelector.getTopic(tuple));
-        assertEquals("tg", topicSelector.getTag(tuple));
-
-        tuple = TestUtils.generateTestTuple("fn", "f2", "vn", "v2");
-        assertEquals("tpc", topicSelector.getTopic(tuple));
-        assertEquals("v2", topicSelector.getTag(tuple));
-    }
-
-}
\ No newline at end of file
diff --git 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/spout/RocketMqSpoutTest.java
 
b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/spout/RocketMqSpoutTest.java
deleted file mode 100644
index f036be134..000000000
--- 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/spout/RocketMqSpoutTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.spout;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.storm.rocketmq.ConsumerBatchMessage;
-import org.apache.storm.rocketmq.RocketMqUtils;
-import org.apache.storm.rocketmq.SpoutConfig;
-import org.apache.storm.rocketmq.TestUtils;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-public class RocketMqSpoutTest {
-    private RocketMqSpout spout;
-    private DefaultMQPushConsumer consumer;
-    private SpoutOutputCollector collector;
-    private BlockingQueue<ConsumerBatchMessage> queue;
-    private Properties properties;
-
-    @BeforeEach
-    public void setUp() {
-        properties = new Properties();
-        properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, "address");
-        properties.setProperty(SpoutConfig.CONSUMER_GROUP, "group");
-        properties.setProperty(SpoutConfig.CONSUMER_TOPIC, "topic");
-
-        spout = new RocketMqSpout(properties);
-        consumer = mock(DefaultMQPushConsumer.class);
-        TestUtils.setFieldValue(spout, "consumer", consumer);
-        collector = mock(SpoutOutputCollector.class);
-        TestUtils.setFieldValue(spout, "collector", collector);
-
-        queue = new LinkedBlockingQueue<>();
-        TestUtils.setFieldValue(spout, "queue", queue);
-
-        Map<String, ConsumerBatchMessage> cache = mock(Map.class);
-        TestUtils.setFieldValue(spout, "cache", cache);
-    }
-
-    @Test
-    public void nextTuple() throws Exception {
-        List<List<Object>> list = new ArrayList<>();
-        list.add(RocketMqUtils.generateTuples(new Message("tpc", 
"body".getBytes()),
-            RocketMqUtils.createScheme(properties)));
-        ConsumerBatchMessage<List<Object>> batchMessage = new 
ConsumerBatchMessage<>(list);
-        queue.put(batchMessage);
-
-        spout.nextTuple();
-        verify(collector).emit(anyList(), anyString());
-    }
-
-    @Test
-    public void close() {
-        spout.close();
-        verify(consumer).shutdown();
-    }
-}
\ No newline at end of file
diff --git 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueSchemeTest.java
 
b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueSchemeTest.java
deleted file mode 100644
index dbb59a51c..000000000
--- 
a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueSchemeTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.rocketmq.spout.scheme;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.Collections;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.storm.tuple.Fields;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class StringKeyValueSchemeTest {
-
-    private StringKeyValueScheme scheme = new StringKeyValueScheme();
-
-    @Test
-    public void testDeserialize() throws Exception {
-        assertEquals(Collections.singletonList("test"), 
scheme.deserialize(wrapString("test")));
-    }
-
-    @Test
-    public void testGetOutputFields() throws Exception {
-        Fields outputFields = scheme.getOutputFields();
-        assertTrue(outputFields.contains(StringScheme.STRING_SCHEME_KEY));
-        assertEquals(1, outputFields.size());
-    }
-
-    @Test
-    public void testDeserializeWithNullKeyAndValue() throws Exception {
-        assertEquals(Collections.singletonList("test"),
-            scheme.deserializeKeyAndValue(null, wrapString("test")));
-    }
-
-    @Test
-    public void testDeserializeWithKeyAndValue() throws Exception {
-        assertEquals(Collections.singletonList(ImmutableMap.of("key", "test")),
-            scheme.deserializeKeyAndValue(wrapString("key"), 
wrapString("test")));
-    }
-
-    private static ByteBuffer wrapString(String s) {
-        return ByteBuffer.wrap(s.getBytes(Charset.defaultCharset()));
-    }
-}
diff --git a/pom.xml b/pom.xml
index 583e94607..efedc0ed1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -499,7 +499,6 @@
                 <module>external/storm-kafka-migration</module>
                 <module>external/storm-kafka-monitor</module>
                 <module>external/storm-jms</module>
-                <module>external/storm-rocketmq</module>
                 <module>external/storm-blobstore-migration</module>
                 <module>integration-test</module>
 
@@ -524,7 +523,6 @@
                 <module>examples/storm-hive-examples</module>
                 <module>examples/storm-elasticsearch-examples</module>
                 <module>examples/storm-jms-examples</module>
-                <module>examples/storm-rocketmq-examples</module>
                 <module>examples/storm-perf</module>
             </modules>
         </profile>

Reply via email to