This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch STORM-3988 in repository https://gitbox.apache.org/repos/asf/storm.git
commit e655c6d88ebbdfd13d4c9c23dee5a698751fd2b7 Author: Richard Zowalla <[email protected]> AuthorDate: Thu Oct 19 09:00:22 2023 +0200 STORM-3988 - Remove "storm-rocketmq" --- examples/storm-rocketmq-examples/pom.xml | 96 --------- .../storm/rocketmq/topology/WordCountTopology.java | 89 -------- .../storm/rocketmq/topology/WordCounter.java | 68 ------ .../storm/rocketmq/trident/WordCountTrident.java | 89 -------- external/storm-rocketmq/README.md | 150 -------------- external/storm-rocketmq/pom.xml | 90 -------- .../storm/rocketmq/ConsumerBatchMessage.java | 65 ------ .../rocketmq/DefaultMessageBodySerializer.java | 38 ---- .../storm/rocketmq/MessageBodySerializer.java | 28 --- .../org/apache/storm/rocketmq/RocketMqConfig.java | 178 ---------------- .../org/apache/storm/rocketmq/RocketMqUtils.java | 80 -------- .../org/apache/storm/rocketmq/SpoutConfig.java | 29 --- .../apache/storm/rocketmq/bolt/RocketMqBolt.java | 216 ------------------- .../mapper/FieldNameBasedTupleToMessageMapper.java | 70 ------- .../common/mapper/TupleToMessageMapper.java | 32 --- .../common/selector/DefaultTopicSelector.java | 45 ---- .../selector/FieldNameBasedTopicSelector.java | 70 ------- .../rocketmq/common/selector/TopicSelector.java | 29 --- .../apache/storm/rocketmq/spout/RocketMqSpout.java | 228 --------------------- .../rocketmq/spout/scheme/KeyValueScheme.java | 27 --- .../spout/scheme/StringKeyValueScheme.java | 40 ---- .../storm/rocketmq/spout/scheme/StringScheme.java | 56 ----- .../rocketmq/trident/state/RocketMqState.java | 132 ------------ .../trident/state/RocketMqStateFactory.java | 43 ---- .../trident/state/RocketMqStateUpdater.java | 35 ---- .../rocketmq/DefaultMessageBodySerializerTest.java | 36 ---- .../java/org/apache/storm/rocketmq/TestUtils.java | 45 ---- .../storm/rocketmq/bolt/RocketMqBoltTest.java | 67 ------ .../FieldNameBasedTupleToMessageMapperTest.java | 36 ---- .../common/selector/DefaultTopicSelectorTest.java | 40 ---- .../selector/FieldNameBasedTopicSelectorTest.java | 40 ---- .../storm/rocketmq/spout/RocketMqSpoutTest.java | 87 -------- .../spout/scheme/StringKeyValueSchemeTest.java | 63 ------ pom.xml | 2 - 34 files changed, 2439 deletions(-) diff --git a/examples/storm-rocketmq-examples/pom.xml b/examples/storm-rocketmq-examples/pom.xml deleted file mode 100644 index c0ae8eb48..000000000 --- a/examples/storm-rocketmq-examples/pom.xml +++ /dev/null @@ -1,96 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>storm</artifactId> - <groupId>org.apache.storm</groupId> - <version>2.6.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>storm-rocketmq-examples</artifactId> - - <dependencies> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-client</artifactId> - <version>${project.version}</version> - <scope>${provided.scope}</scope> - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-rocketmq</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <configuration> - <createDependencyReducedPom>true</createDependencyReducedPom> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.sf</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.dsa</exclude> - <exclude>META-INF/*.RSA</exclude> - <exclude>META-INF/*.rsa</exclude> - <exclude>META-INF/*.EC</exclude> - <exclude>META-INF/*.ec</exclude> - <exclude>META-INF/MSFTSIG.SF</exclude> - <exclude>META-INF/MSFTSIG.RSA</exclude> - </excludes> - </filter> - </filters> - </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> - <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - </transformer> - </transformers> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <!--Note - the version would be inherited--> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-pmd-plugin</artifactId> - </plugin> - </plugins> - </build> -</project> diff --git a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java deleted file mode 100644 index 3b40b51fc..000000000 --- a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.topology; - -import java.util.Properties; -import org.apache.storm.Config; -import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.rocketmq.RocketMqConfig; -import org.apache.storm.rocketmq.SpoutConfig; -import org.apache.storm.rocketmq.bolt.RocketMqBolt; -import org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper; -import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper; -import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector; -import org.apache.storm.rocketmq.common.selector.TopicSelector; -import org.apache.storm.rocketmq.spout.RocketMqSpout; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; - -public class WordCountTopology { - private static final String WORD_SPOUT = "WORD_SPOUT"; - private static final String COUNT_BOLT = "COUNT_BOLT"; - private static final String INSERT_BOLT = "INSERT_BOLT"; - - private static final String CONSUMER_GROUP = "wordcount"; - private static final String CONSUMER_TOPIC = "source"; - - public static StormTopology buildTopology(String nameserverAddr, String topic) { - Properties properties = new Properties(); - properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, nameserverAddr); - properties.setProperty(SpoutConfig.CONSUMER_GROUP, CONSUMER_GROUP); - properties.setProperty(SpoutConfig.CONSUMER_TOPIC, CONSUMER_TOPIC); - - RocketMqSpout spout = new RocketMqSpout(properties); - - TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count"); - TopicSelector selector = new DefaultTopicSelector(topic); - - properties = new Properties(); - properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr); - - RocketMqBolt insertBolt = new RocketMqBolt() - .withMapper(mapper) - .withSelector(selector) - .withProperties(properties); - - // wordSpout ==> countBolt ==> insertBolt - TopologyBuilder builder = new TopologyBuilder(); - - WordCounter bolt = new WordCounter(); - builder.setSpout(WORD_SPOUT, spout, 1); - builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("str")); - builder.setBolt(INSERT_BOLT, insertBolt, 1).shuffleGrouping(COUNT_BOLT); - - return builder.createTopology(); - } - - public static void main(String[] args) throws Exception { - Config conf = new Config(); - conf.setMaxSpoutPending(5); - conf.setNumWorkers(3); - - String topologyName = "wordCounter"; - if (args.length < 2) { - System.out.println("Usage: WordCountTopology <nameserver addr> <topic> [topology name]"); - } else { - if (args.length > 3) { - topologyName = args[2]; - } - StormSubmitter.submitTopology(topologyName, conf, buildTopology(args[0], args[1])); - } - } -} diff --git a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java deleted file mode 100644 index dd6820036..000000000 --- a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.topology; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.IBasicBolt; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; - -public class WordCounter implements IBasicBolt { - private Map<String, Integer> wordCounter = new HashMap<>(); - - @Override - public void prepare(Map<String, Object> topoConf, TopologyContext context) { - - } - - @Override - public void execute(Tuple input, BasicOutputCollector collector) { - String word = input.getStringByField("str"); - int count; - if (wordCounter.containsKey(word)) { - count = wordCounter.get(word) + 1; - } else { - count = 1; - } - wordCounter.put(word, count); - collector.emit(new Values(word, count)); - } - - @Override - public void cleanup() { - - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - -} diff --git a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java deleted file mode 100644 index dbdee9355..000000000 --- a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.trident; - -import java.util.Properties; -import org.apache.storm.Config; -import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.rocketmq.RocketMqConfig; -import org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper; -import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper; -import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector; -import org.apache.storm.rocketmq.common.selector.TopicSelector; -import org.apache.storm.rocketmq.trident.state.RocketMqState; -import org.apache.storm.rocketmq.trident.state.RocketMqStateFactory; -import org.apache.storm.rocketmq.trident.state.RocketMqStateUpdater; -import org.apache.storm.trident.Stream; -import org.apache.storm.trident.TridentTopology; -import org.apache.storm.trident.state.StateFactory; -import org.apache.storm.trident.testing.FixedBatchSpout; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - -public class WordCountTrident { - - public static StormTopology buildTopology(String nameserverAddr, String topic) { - Fields fields = new Fields("word", "count"); - FixedBatchSpout spout = new FixedBatchSpout(fields, 4, - new Values("storm", 1), - new Values("trident", 1), - new Values("needs", 1), - new Values("javadoc", 1) - ); - spout.setCycle(true); - - TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count"); - TopicSelector selector = new DefaultTopicSelector(topic); - - Properties properties = new Properties(); - properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr); - - RocketMqState.Options options = new RocketMqState.Options() - .withMapper(mapper) - .withSelector(selector) - .withProperties(properties); - - StateFactory factory = new RocketMqStateFactory(options); - - TridentTopology topology = new TridentTopology(); - Stream stream = topology.newStream("spout1", spout); - - stream.partitionPersist(factory, fields, - new RocketMqStateUpdater(), new Fields()); - - return topology.build(); - } - - public static void main(String[] args) throws Exception { - Config conf = new Config(); - conf.setMaxSpoutPending(5); - conf.setNumWorkers(3); - - String topologyName = "wordCounter"; - if (args.length < 2) { - System.out.println("Usage: WordCountTrident <nameserver addr> <topic> [topology name]"); - } else { - if (args.length > 3) { - topologyName = args[2]; - } - StormSubmitter.submitTopology(topologyName, conf, buildTopology(args[0], args[1])); - } - } -} diff --git a/external/storm-rocketmq/README.md b/external/storm-rocketmq/README.md deleted file mode 100644 index 4934cf0e8..000000000 --- a/external/storm-rocketmq/README.md +++ /dev/null @@ -1,150 +0,0 @@ -# Storm RocketMQ - -Storm/Trident integration for [RocketMQ](https://rocketmq.apache.org/). This package includes the core spout, bolt and trident states that allows a storm topology to either write storm tuples into a topic or read from topics in a storm topology. - - -## Read from Topic -The spout included in this package for reading data from a topic. - -### RocketMqSpout -To use the `RocketMqSpout`, you construct an instance of it by specifying a Properties instance which including rocketmq configs. -RocketMqSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer. -RocketMqSpout's messages retrying depends on RocketMQ's push mode retry policy. - - ```java - Properties properties = new Properties(); - properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, nameserverAddr); - properties.setProperty(SpoutConfig.CONSUMER_GROUP, group); - properties.setProperty(SpoutConfig.CONSUMER_TOPIC, topic); - - RocketMqSpout spout = new RocketMqSpout(properties); - ``` - - -## Write into Topic -The bolt and trident state included in this package for write data into a topic. - -### TupleToMessageMapper -The main API for mapping Storm tuple to a RocketMQ Message is the `org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper` interface: - -```java -public interface TupleToMessageMapper extends Serializable { - String getKeyFromTuple(ITuple tuple); - byte[] getValueFromTuple(ITuple tuple); -} -``` - -### FieldNameBasedTupleToMessageMapper -`storm-rocketmq` includes a general purpose `TupleToMessageMapper` implementation called `FieldNameBasedTupleToMessageMapper`. - -### TopicSelector -The main API for selecting topic and tags is the `org.apache.storm.rocketmq.common.selector.TopicSelector` interface: - -```java -public interface TopicSelector extends Serializable { - String getTopic(ITuple tuple); - String getTag(ITuple tuple); -} -``` - -### DefaultTopicSelector/FieldNameBasedTopicSelector -`storm-rocketmq` includes general purpose `TopicSelector` implementations called `DefaultTopicSelector` and `FieldNameBasedTopicSelector`. - - -### RocketMqBolt -To use the `RocketMqBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances. -RocketMqBolt send messages async by default. You can change this by invoking `withAsync(false)`. - - ```java - TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count"); - TopicSelector selector = new DefaultTopicSelector(topic); - - properties = new Properties(); - properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr); - - RocketMqBolt insertBolt = new RocketMqBolt() - .withMapper(mapper) - .withSelector(selector) - .withProperties(properties); - ``` - -### Trident State -We support trident persistent state that can be used with trident topologies. To create a RocketMQ persistent trident state you need to initialize it with the TupleToMessageMapper, TopicSelector, Properties instances. See the example below: - - ```java - TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count"); - TopicSelector selector = new DefaultTopicSelector(topic); - - Properties properties = new Properties(); - properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr); - - RocketMqState.Options options = new RocketMqState.Options() - .withMapper(mapper) - .withSelector(selector) - .withProperties(properties); - - StateFactory factory = new RocketMqStateFactory(options); - - TridentTopology topology = new TridentTopology(); - Stream stream = topology.newStream("spout1", spout); - - stream.partitionPersist(factory, fields, - new RocketMqStateUpdater(), new Fields()); - ``` - -## Configurations - -### Producer Configurations -| NAME | DESCRIPTION | DEFAULT | -| ------------- |:-------------:|:------:| -| nameserver.address | name server address *Required* | null | -| nameserver.poll.interval | name server poll topic info interval | 30000 | -| brokerserver.heartbeat.interval | broker server heartbeat interval | 30000 | -| producer.group | producer group | $UUID | -| producer.retry.times | producer send messages retry times | 3 | -| producer.timeout | producer send messages timeout | 3000 | - - -### Consumer Configurations -| NAME | DESCRIPTION | DEFAULT | -| ------------- |:-------------:|:------:| -| nameserver.address | name server address *Required* | null | -| nameserver.poll.interval | name server poll topic info interval | 30000 | -| brokerserver.heartbeat.interval | broker server heartbeat interval | 30000 | -| consumer.group | consumer group *Required* | null | -| consumer.topic | consumer topic *Required* | null | -| consumer.tag | consumer topic tag | * | -| consumer.offset.reset.to | what to do when there is no initial offset on the server | latest/earliest/timestamp | -| consumer.offset.from.timestamp | the timestamp when `consumer.offset.reset.to=timestamp` was set | $TIMESTAMP | -| consumer.messages.orderly | if the consumer topic is ordered | false | -| consumer.offset.persist.interval | auto commit offset interval | 5000 | -| consumer.min.threads | consumer min threads | 20 | -| consumer.max.threads | consumer max threads | 64 | -| consumer.callback.executor.threads | client callback executor threads | $availableProcessors | -| consumer.batch.size | consumer messages batch size | 32 | -| consumer.batch.process.timeout | consumer messages batch process timeout | $TOPOLOGY_MESSAGE_TIMEOUT_SECS + 10s| - - -## License - -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. - -## Committer Sponsors - - * Xin Wang ([[email protected]](mailto:[email protected])) - diff --git a/external/storm-rocketmq/pom.xml b/external/storm-rocketmq/pom.xml deleted file mode 100644 index 969ac1d30..000000000 --- a/external/storm-rocketmq/pom.xml +++ /dev/null @@ -1,90 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>storm</artifactId> - <groupId>org.apache.storm</groupId> - <version>2.6.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>storm-rocketmq</artifactId> - <name>storm-rocketmq</name> - - <packaging>jar</packaging> - - <developers> - <developer> - <id>vesense</id> - <name>Xin Wang</name> - <email>[email protected]</email> - </developer> - </developers> - - <dependencies> - <!--parent module dependency--> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-client</artifactId> - <version>${project.version}</version> - <scope>${provided.scope}</scope> - </dependency> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-client</artifactId> - <version>${rocketmq.version}</version> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - </dependency> - <!--test dependencies --> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-server</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <!--Note - the version would be inherited--> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-pmd-plugin</artifactId> - </plugin> - </plugins> - </build> - -</project> diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerBatchMessage.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerBatchMessage.java deleted file mode 100644 index c32a18a35..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerBatchMessage.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq; - -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public class ConsumerBatchMessage<T> { - private final List<T> data; - private CountDownLatch latch; - private boolean hasFailure = false; - - public ConsumerBatchMessage(List<T> data) { - this.data = data; - latch = new CountDownLatch(data.size()); - } - - public boolean waitFinish(long timeout) throws InterruptedException { - return latch.await(timeout, TimeUnit.MILLISECONDS); - } - - public boolean isSuccess() { - return !hasFailure; - } - - public List<T> getData() { - return data; - } - - /** - * Countdown if the sub message is successful. - */ - public void ack() { - latch.countDown(); - } - - /** - * Countdown and fail-fast if the sub message is failed. - */ - public void fail() { - hasFailure = true; - // fail fast - long count = latch.getCount(); - for (int i = 0; i < count; i++) { - latch.countDown(); - } - } -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java deleted file mode 100644 index 7cfaac6c0..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq; - -import java.nio.charset.StandardCharsets; - -public class DefaultMessageBodySerializer implements MessageBodySerializer { - - /** - * Currently, we just convert string to bytes using UTF-8 charset. - * Note: in this way, object.toString() method is invoked. - * @param body RocketMQ Message body - * @return serialized byte array - */ - @Override - public byte[] serialize(Object body) { - if (body == null) { - return null; - } - return body.toString().getBytes(StandardCharsets.UTF_8); - } -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java deleted file mode 100644 index fbf16cd6c..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq; - -import java.io.Serializable; - -/** - * RocketMQ message body serializer. - */ -public interface MessageBodySerializer extends Serializable { - byte[] serialize(Object body); -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqConfig.java deleted file mode 100644 index 9a239beae..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqConfig.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq; - -import static org.apache.storm.rocketmq.RocketMqUtils.getInteger; - -import java.util.Properties; -import java.util.UUID; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.Validate; -import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.common.consumer.ConsumeFromWhere; - -/** - * RocketMqConfig for Consumer/Producer. - */ -public class RocketMqConfig { - // common - public static final String NAME_SERVER_ADDR = "nameserver.address"; // Required - - public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval"; - public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds - - public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval"; - public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds - - - // producer - public static final String PRODUCER_GROUP = "producer.group"; - - public static final String PRODUCER_RETRY_TIMES = "producer.retry.times"; - public static final int DEFAULT_PRODUCER_RETRY_TIMES = 3; - - public static final String PRODUCER_TIMEOUT = "producer.timeout"; - public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds - - - // consumer - public static final String CONSUMER_GROUP = "consumer.group"; // Required - - public static final String CONSUMER_TOPIC = "consumer.topic"; // Required - - public static final String CONSUMER_TAG = "consumer.tag"; - public static final String DEFAULT_CONSUMER_TAG = "*"; - - public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to"; - public static final String CONSUMER_OFFSET_LATEST = "latest"; - public static final String CONSUMER_OFFSET_EARLIEST = "earliest"; - public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp"; - public static final String CONSUMER_OFFSET_FROM_TIMESTAMP = "consumer.offset.from.timestamp"; - - public static final String CONSUMER_MESSAGES_ORDERLY = "consumer.messages.orderly"; - - public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval"; - public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds - - public static final String CONSUMER_MIN_THREADS = "consumer.min.threads"; - public static final int DEFAULT_CONSUMER_MIN_THREADS = 20; - - public static final String CONSUMER_MAX_THREADS = "consumer.max.threads"; - public static final int DEFAULT_CONSUMER_MAX_THREADS = 64; - - public static final String CONSUMER_CALLBACK_EXECUTOR_THREADS = "consumer.callback.executor.threads"; - public static final int DEFAULT_CONSUMER_CALLBACK_EXECUTOR_THREADS = Runtime.getRuntime().availableProcessors(); - - public static final String CONSUMER_BATCH_SIZE = "consumer.batch.size"; - public static final int DEFAULT_CONSUMER_BATCH_SIZE = 32; - - public static final String CONSUMER_BATCH_PROCESS_TIMEOUT = "consumer.batch.process.timeout"; - - /** - * Build Producer Configs. - * @param props Properties - * @param producer DefaultMQProducer - */ - public static void buildProducerConfigs(Properties props, DefaultMQProducer producer) { - buildCommonConfigs(props, producer); - - String group = props.getProperty(PRODUCER_GROUP); - if (StringUtils.isEmpty(group)) { - group = UUID.randomUUID().toString(); - } - producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, group)); - - producer.setRetryTimesWhenSendFailed(getInteger(props, - PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES)); - producer.setRetryTimesWhenSendAsyncFailed(getInteger(props, - PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES)); - producer.setSendMsgTimeout(getInteger(props, - PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT)); - } - - /** - * Build Consumer Configs. - * @param props Properties - * @param consumer DefaultMQPushConsumer - */ - public static void buildConsumerConfigs(Properties props, DefaultMQPushConsumer consumer) { - buildCommonConfigs(props, consumer); - - String group = props.getProperty(CONSUMER_GROUP); - Validate.notEmpty(group); - consumer.setConsumerGroup(group); - - consumer.setPullBatchSize(getInteger(props, - CONSUMER_BATCH_SIZE, DEFAULT_CONSUMER_BATCH_SIZE)); - - consumer.setPersistConsumerOffsetInterval(getInteger(props, - CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL)); - consumer.setConsumeThreadMin(getInteger(props, - CONSUMER_MIN_THREADS, DEFAULT_CONSUMER_MIN_THREADS)); - consumer.setConsumeThreadMax(getInteger(props, - CONSUMER_MAX_THREADS, DEFAULT_CONSUMER_MAX_THREADS)); - - consumer.setClientCallbackExecutorThreads(getInteger(props, - CONSUMER_CALLBACK_EXECUTOR_THREADS, DEFAULT_CONSUMER_CALLBACK_EXECUTOR_THREADS)); - - String initOffset = props.getProperty(CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST); - switch (initOffset) { - case CONSUMER_OFFSET_EARLIEST: - consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); - break; - case CONSUMER_OFFSET_LATEST: - consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); - break; - case CONSUMER_OFFSET_TIMESTAMP: - String timestamp = props.getProperty(CONSUMER_OFFSET_FROM_TIMESTAMP); - consumer.setConsumeTimestamp(timestamp); - break; - default: - consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); - } - - String topic = props.getProperty(CONSUMER_TOPIC); - Validate.notEmpty(topic); - try { - consumer.subscribe(topic, props.getProperty(CONSUMER_TAG, DEFAULT_CONSUMER_TAG)); - } catch (MQClientException e) { - throw new IllegalArgumentException(e); - } - } - - /** - * Build Common Configs. - * @param props Properties - * @param client ClientConfig - */ - public static void buildCommonConfigs(Properties props, ClientConfig client) { - String nameServers = props.getProperty(NAME_SERVER_ADDR); - Validate.notEmpty(nameServers); - client.setNamesrvAddr(nameServers); - - client.setPollNameServerInterval(getInteger(props, - NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL)); - client.setHeartbeatBrokerInterval(getInteger(props, - BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL)); - } -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqUtils.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqUtils.java deleted file mode 100644 index a2d7e7a6d..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqUtils.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq; - -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Properties; - -import org.apache.rocketmq.common.message.Message; -import org.apache.storm.rocketmq.spout.scheme.KeyValueScheme; -import org.apache.storm.spout.Scheme; - -public final class RocketMqUtils { - - public static int getInteger(Properties props, String key, int defaultValue) { - return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue))); - } - - public static long getLong(Properties props, String key, long defaultValue) { - return Long.parseLong(props.getProperty(key, String.valueOf(defaultValue))); - } - - public static boolean getBoolean(Properties props, String key, boolean defaultValue) { - return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue))); - } - - /** - * Create Scheme by Properties. - * @param props Properties - * @return Scheme - */ - public static Scheme createScheme(Properties props) { - String schemeString = props.getProperty(SpoutConfig.SCHEME, SpoutConfig.DEFAULT_SCHEME); - Scheme scheme; - try { - Class clazz = Class.forName(schemeString); - scheme = (Scheme) clazz.newInstance(); - } catch (Exception e) { - throw new IllegalArgumentException("Cannot create Scheme for " + schemeString - + " due to " + e.getMessage()); - } - return scheme; - } - - /** - * Generate Storm tuple values by Message and Scheme. - * @param msg RocketMQ Message - * @param scheme Scheme for deserializing - * @return tuple values - */ - public static List<Object> generateTuples(Message msg, Scheme scheme) { - List<Object> tup; - String rawKey = msg.getKeys(); - ByteBuffer body = ByteBuffer.wrap(msg.getBody()); - if (rawKey != null && scheme instanceof KeyValueScheme) { - ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8)); - tup = ((KeyValueScheme) scheme).deserializeKeyAndValue(key, body); - } else { - tup = scheme.deserialize(body); - } - return tup; - } -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java deleted file mode 100644 index 83e956d53..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq; - -import org.apache.storm.rocketmq.spout.scheme.StringScheme; - -public class SpoutConfig extends RocketMqConfig { - - public static final String SCHEME = "spout.scheme"; - public static final String DEFAULT_SCHEME = StringScheme.class.getName(); - - -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMqBolt.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMqBolt.java deleted file mode 100644 index 4cfa9f1d9..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMqBolt.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.bolt; - -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import org.apache.commons.lang.Validate; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.common.message.Message; -import org.apache.storm.Config; -import org.apache.storm.rocketmq.RocketMqConfig; -import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper; -import org.apache.storm.rocketmq.common.selector.TopicSelector; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.IRichBolt; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.BatchHelper; -import org.apache.storm.utils.TupleUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RocketMqBolt implements IRichBolt { - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(RocketMqBolt.class); - - private static final int DEFAULT_FLUSH_INTERVAL_SECS = 5; - private static final int DEFAULT_BATCH_SIZE = 20; - - private DefaultMQProducer producer; - private OutputCollector collector; - private TopicSelector selector; - private TupleToMessageMapper mapper; - private Properties properties; - - private boolean async = true; - private boolean batch = false; - private int batchSize = DEFAULT_BATCH_SIZE; - private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS; - private BatchHelper batchHelper; - private List<Message> messages; - - @Override - public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { - Validate.notEmpty(properties, "Producer properties can not be empty"); - Validate.notNull(selector, "TopicSelector can not be null"); - Validate.notNull(mapper, "TupleToMessageMapper can not be null"); - - producer = new DefaultMQProducer(); - producer.setInstanceName(String.valueOf(context.getThisTaskId())); - RocketMqConfig.buildProducerConfigs(properties, producer); - - try { - producer.start(); - } catch (MQClientException e) { - throw new RuntimeException(e); - } - - this.collector = collector; - this.batchHelper = new BatchHelper(batchSize, collector); - this.messages = new LinkedList<>(); - } - - public RocketMqBolt withSelector(TopicSelector selector) { - this.selector = selector; - return this; - } - - public RocketMqBolt withMapper(TupleToMessageMapper mapper) { - this.mapper = mapper; - return this; - } - - public RocketMqBolt withAsync(boolean async) { - this.async = async; - return this; - } - - public RocketMqBolt withBatch(boolean batch) { - this.batch = batch; - return this; - } - - public RocketMqBolt withBatchSize(int batchSize) { - this.batchSize = batchSize; - return this; - } - - public RocketMqBolt withFlushIntervalSecs(int flushIntervalSecs) { - this.flushIntervalSecs = flushIntervalSecs; - return this; - } - - public RocketMqBolt withProperties(Properties properties) { - this.properties = properties; - return this; - } - - @Override - public void execute(Tuple input) { - if (!batch && TupleUtils.isTick(input)) { - return; - } - - String topic = selector.getTopic(input); - if (topic == null) { - LOG.warn("skipping Message due to topic selector returned null."); - collector.ack(input); - return; - } - - if (batch) { - // batch sync sending - try { - if (batchHelper.shouldHandle(input)) { - batchHelper.addBatch(input); - messages.add(prepareMessage(input)); - } - - if (batchHelper.shouldFlush()) { - producer.send(messages); - batchHelper.ack(); - messages.clear(); - } - } catch (Exception e) { - LOG.error("Batch send messages failure!", e); - batchHelper.fail(e); - messages.clear(); - } - } else { - if (async) { - // async sending - try { - producer.send(prepareMessage(input), new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - collector.ack(input); - } - - @Override - public void onException(Throwable throwable) { - if (throwable != null) { - LOG.error("Async send messages failure!", throwable); - collector.reportError(throwable); - collector.fail(input); - } - } - }); - } catch (Exception e) { - LOG.error("Async send messages failure!", e); - collector.reportError(e); - collector.fail(input); - } - } else { - // sync sending, will return a SendResult - try { - producer.send(prepareMessage(input)); - collector.ack(input); - } catch (Exception e) { - LOG.error("Sync send messages failure!", e); - collector.reportError(e); - collector.fail(input); - } - } - } - } - - // Mapping: from storm tuple -> rocketmq Message - private Message prepareMessage(Tuple input) { - String topic = selector.getTopic(input); - String tag = selector.getTag(input); - String key = mapper.getKeyFromTuple(input); - byte[] value = mapper.getValueFromTuple(input); - - return new Message(topic, tag, key, value); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return TupleUtils.putTickFrequencyIntoComponentConfig(new Config(), flushIntervalSecs); - } - - @Override - public void cleanup() { - if (producer != null) { - producer.shutdown(); - } - } -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java deleted file mode 100644 index a38a36539..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.common.mapper; - -import org.apache.storm.rocketmq.DefaultMessageBodySerializer; -import org.apache.storm.rocketmq.MessageBodySerializer; -import org.apache.storm.tuple.ITuple; - -public class FieldNameBasedTupleToMessageMapper implements TupleToMessageMapper { - public static final String BOLT_KEY = "key"; - public static final String BOLT_MESSAGE = "message"; - public String boltKeyField; - public String boltMessageField; - private MessageBodySerializer messageBodySerializer; - - public FieldNameBasedTupleToMessageMapper() { - this(BOLT_KEY, BOLT_MESSAGE); - } - - /** - * FieldNameBasedTupleToMessageMapper Constructor. - * @param boltKeyField tuple field for selecting the key - * @param boltMessageField tuple field for selecting the value - */ - public FieldNameBasedTupleToMessageMapper(String boltKeyField, String boltMessageField) { - this.boltKeyField = boltKeyField; - this.boltMessageField = boltMessageField; - this.messageBodySerializer = new DefaultMessageBodySerializer(); - } - - @Override - public String getKeyFromTuple(ITuple tuple) { - return tuple.getStringByField(boltKeyField); - } - - @Override - public byte[] getValueFromTuple(ITuple tuple) { - Object obj = tuple.getValueByField(boltMessageField); - if (obj == null) { - return null; - } - return messageBodySerializer.serialize(obj); - } - - /** - * using this method can override the default MessageBodySerializer. - * @param serializer MessageBodySerializer - * @return this object - */ - public FieldNameBasedTupleToMessageMapper withMessageBodySerializer(MessageBodySerializer serializer) { - this.messageBodySerializer = serializer; - return this; - } -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java deleted file mode 100644 index ef716a4c4..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.common.mapper; - -import java.io.Serializable; -import org.apache.storm.tuple.ITuple; - -/** - * Interface defining a mapping from storm tuple to rocketmq key and message. - */ -public interface TupleToMessageMapper extends Serializable { - - String getKeyFromTuple(ITuple tuple); - - byte[] getValueFromTuple(ITuple tuple); -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java deleted file mode 100644 index e0192da68..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.common.selector; - -import org.apache.storm.tuple.ITuple; - -public class DefaultTopicSelector implements TopicSelector { - private final String topicName; - private final String tagName; - - public DefaultTopicSelector(final String topicName, final String tagName) { - this.topicName = topicName; - this.tagName = tagName; - } - - public DefaultTopicSelector(final String topicName) { - this(topicName, ""); - } - - @Override - public String getTopic(ITuple tuple) { - return topicName; - } - - @Override - public String getTag(ITuple tuple) { - return tagName; - } -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java deleted file mode 100644 index 253221ee5..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.common.selector; - -import org.apache.storm.tuple.ITuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Uses field name to select topic and tag name from tuple. - */ -public class FieldNameBasedTopicSelector implements TopicSelector { - private static final Logger LOG = LoggerFactory.getLogger(FieldNameBasedTopicSelector.class); - - private final String topicFieldName; - private final String defaultTopicName; - - private final String tagFieldName; - private final String defaultTagName; - - /** - * FieldNameBasedTopicSelector Constructor. - * @param topicFieldName field name used for selecting topic - * @param defaultTopicName default field name used for selecting topic - * @param tagFieldName field name used for selecting tag - * @param defaultTagName default field name used for selecting tag - */ - public FieldNameBasedTopicSelector(String topicFieldName, String defaultTopicName, String tagFieldName, String defaultTagName) { - this.topicFieldName = topicFieldName; - this.defaultTopicName = defaultTopicName; - this.tagFieldName = tagFieldName; - this.defaultTagName = defaultTagName; - } - - @Override - public String getTopic(ITuple tuple) { - if (tuple.contains(topicFieldName)) { - return tuple.getStringByField(topicFieldName); - } else { - LOG.warn("Field {} Not Found. Returning default topic {}", topicFieldName, defaultTopicName); - return defaultTopicName; - } - } - - @Override - public String getTag(ITuple tuple) { - if (tuple.contains(tagFieldName)) { - return tuple.getStringByField(tagFieldName); - } else { - LOG.warn("Field {} Not Found. Returning default tag {}", tagFieldName, defaultTagName); - return defaultTagName; - } - } -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java deleted file mode 100644 index 2940920fe..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.common.selector; - -import java.io.Serializable; -import org.apache.storm.tuple.ITuple; - -public interface TopicSelector extends Serializable { - - String getTopic(ITuple tuple); - - String getTag(ITuple tuple); -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java deleted file mode 100644 index c08dd6263..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.spout; - -import static org.apache.storm.rocketmq.RocketMqUtils.getBoolean; -import static org.apache.storm.rocketmq.RocketMqUtils.getLong; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.commons.lang.Validate; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.storm.Config; -import org.apache.storm.rocketmq.ConsumerBatchMessage; -import org.apache.storm.rocketmq.RocketMqConfig; -import org.apache.storm.rocketmq.RocketMqUtils; -import org.apache.storm.spout.Scheme; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.IRichSpout; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * RocketMqSpout uses MQPushConsumer as the default implementation. - * PushConsumer is a high level consumer API, wrapping the pulling details - * Looks like broker push messages to consumer - */ -public class RocketMqSpout implements IRichSpout { - // TODO add metrics - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(RocketMqSpout.class); - - private DefaultMQPushConsumer consumer; - private SpoutOutputCollector collector; - private BlockingQueue<ConsumerBatchMessage<List<Object>>> queue; - private Map<String, ConsumerBatchMessage<List<Object>>> cache; - - private Properties properties; - private Scheme scheme; - private long batchProcessTimeout; - - /** - * RocketMqSpout Constructor. - * @param properties Properties Config - */ - public RocketMqSpout(Properties properties) { - Validate.notEmpty(properties, "Consumer properties can not be empty"); - this.properties = properties; - scheme = RocketMqUtils.createScheme(properties); - } - - @Override - public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - consumer = new DefaultMQPushConsumer(); - consumer.setInstanceName(String.valueOf(context.getThisTaskId())); - RocketMqConfig.buildConsumerConfigs(properties, consumer); - - boolean ordered = getBoolean(properties, RocketMqConfig.CONSUMER_MESSAGES_ORDERLY, false); - if (ordered) { - consumer.registerMessageListener(new MessageListenerOrderly() { - @Override - public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeOrderlyContext context) { - if (process(msgs)) { - return ConsumeOrderlyStatus.SUCCESS; - } else { - return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; - } - } - }); - } else { - consumer.registerMessageListener(new MessageListenerConcurrently() { - @Override - public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeConcurrentlyContext context) { - if (process(msgs)) { - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } else { - return ConsumeConcurrentlyStatus.RECONSUME_LATER; - } - } - }); - } - - try { - consumer.start(); - } catch (MQClientException e) { - LOG.error("Failed to start RocketMQ consumer.", e); - throw new RuntimeException(e); - } - - long defaultBatchProcessTimeout = (long) conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 30) * 1000 + 10000; - batchProcessTimeout = getLong(properties, RocketMqConfig.CONSUMER_BATCH_PROCESS_TIMEOUT, defaultBatchProcessTimeout); - - queue = new LinkedBlockingQueue<>(); - cache = new ConcurrentHashMap<>(); - this.collector = collector; - } - - /** - * Process pushed messages. - * @param msgs messages - * @return the boolean flag processed result - */ - protected boolean process(List<MessageExt> msgs) { - if (msgs.isEmpty()) { - return true; - } - - List<List<Object>> list = new ArrayList<>(msgs.size()); - for (MessageExt msg : msgs) { - List<Object> data = RocketMqUtils.generateTuples(msg, scheme); - if (data != null) { - list.add(data); - } - } - ConsumerBatchMessage<List<Object>> batchMessage = new ConsumerBatchMessage<>(list); - try { - queue.put(batchMessage); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - boolean isCompleted; - try { - isCompleted = batchMessage.waitFinish(batchProcessTimeout); - } catch (InterruptedException e) { - LOG.error("Interrupted when waiting messages to be finished.", e); - throw new RuntimeException(e); - } - - boolean isSuccess = batchMessage.isSuccess(); - - return isCompleted && isSuccess; - } - - @Override - public void nextTuple() { - ConsumerBatchMessage<List<Object>> batchMessage = queue.poll(); - if (batchMessage != null) { - List<List<Object>> list = batchMessage.getData(); - for (List<Object> data : list) { - String messageId = UUID.randomUUID().toString(); - cache.put(messageId, batchMessage); - collector.emit(data, messageId); - } - } - } - - @Override - public void ack(Object msgId) { - ConsumerBatchMessage batchMessage = cache.get(msgId); - batchMessage.ack(); - cache.remove(msgId); - LOG.debug("Message acked {}", batchMessage); - } - - @Override - public void fail(Object msgId) { - ConsumerBatchMessage batchMessage = cache.get(msgId); - batchMessage.fail(); - cache.remove(msgId); - LOG.debug("Message failed {}", batchMessage); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(scheme.getOutputFields()); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - - @Override - public void close() { - if (consumer != null) { - consumer.shutdown(); - } - } - - @Override - public void activate() { - if (consumer != null) { - consumer.resume(); - } - } - - @Override - public void deactivate() { - if (consumer != null) { - consumer.suspend(); - } - } -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java deleted file mode 100644 index f4cf803bb..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.spout.scheme; - -import java.nio.ByteBuffer; -import java.util.List; -import org.apache.storm.spout.Scheme; - -public interface KeyValueScheme extends Scheme { - List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value); -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java deleted file mode 100644 index 21a762e1f..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.spout.scheme; - -import com.google.common.collect.ImmutableMap; - -import java.nio.ByteBuffer; -import java.util.List; - -import org.apache.storm.tuple.Values; - -public class StringKeyValueScheme extends StringScheme implements KeyValueScheme { - - @Override - public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) { - if (key == null) { - return deserialize(value); - } - String keyString = StringScheme.deserializeString(key); - String valueString = StringScheme.deserializeString(value); - return new Values(ImmutableMap.of(keyString, valueString)); - } - -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java deleted file mode 100644 index 53fdc57aa..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.spout.scheme; - -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.List; -import org.apache.storm.spout.Scheme; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; - -public class StringScheme implements Scheme { - public static final String STRING_SCHEME_KEY = "str"; - - @Override - public List<Object> deserialize(ByteBuffer bytes) { - return new Values(deserializeString(bytes)); - } - - /** - * Deserialize ByteBuffer to String. - * @param byteBuffer input ByteBuffer - * @return deserialized string - */ - public static String deserializeString(ByteBuffer byteBuffer) { - if (byteBuffer.hasArray()) { - int base = byteBuffer.arrayOffset(); - return new String(byteBuffer.array(), base + byteBuffer.position(), byteBuffer.remaining(), - StandardCharsets.UTF_8); - } else { - return new String(Utils.toByteArray(byteBuffer), StandardCharsets.UTF_8); - } - } - - @Override - public Fields getOutputFields() { - return new Fields(STRING_SCHEME_KEY); - } -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java deleted file mode 100644 index c42bd4711..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.trident.state; - -import java.io.Serializable; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.UUID; - -import org.apache.commons.lang.Validate; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.common.message.Message; -import org.apache.storm.rocketmq.RocketMqConfig; -import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper; -import org.apache.storm.rocketmq.common.selector.TopicSelector; -import org.apache.storm.topology.FailedException; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.state.State; -import org.apache.storm.trident.tuple.TridentTuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RocketMqState implements State { - - private static final Logger LOG = LoggerFactory.getLogger(RocketMqState.class); - - private Options options; - private DefaultMQProducer producer; - - protected RocketMqState(Map<String, Object> map, Options options) { - this.options = options; - } - - public static class Options implements Serializable { - private TopicSelector selector; - private TupleToMessageMapper mapper; - private Properties properties; - - public Options withSelector(TopicSelector selector) { - this.selector = selector; - return this; - } - - public Options withMapper(TupleToMessageMapper mapper) { - this.mapper = mapper; - return this; - } - - public Options withProperties(Properties properties) { - this.properties = properties; - return this; - } - } - - protected void prepare() { - Validate.notEmpty(options.properties, "Producer properties can not be empty"); - Validate.notNull(options.selector, "TopicSelector can not be null"); - Validate.notNull(options.mapper, "TupleToMessageMapper can not be null"); - - producer = new DefaultMQProducer(); - producer.setInstanceName(UUID.randomUUID().toString()); - RocketMqConfig.buildProducerConfigs(options.properties, producer); - - try { - producer.start(); - } catch (MQClientException e) { - throw new RuntimeException(e); - } - } - - @Override - public void beginCommit(Long txid) { - LOG.debug("beginCommit is noop."); - } - - @Override - public void commit(Long txid) { - LOG.debug("commit is noop."); - } - - /** - * Update the RocketMQ state. - * @param tuples trident tuples - * @param collector trident collector - */ - public void updateState(List<TridentTuple> tuples, TridentCollector collector) { - List<Message> messages = new LinkedList<>(); - - for (TridentTuple tuple : tuples) { - String topic = options.selector.getTopic(tuple); - String tag = options.selector.getTag(tuple); - String key = options.mapper.getKeyFromTuple(tuple); - byte[] value = options.mapper.getValueFromTuple(tuple); - - if (topic == null) { - LOG.warn("skipping Message with Key = " + key + ", topic selector returned null."); - continue; - } - - Message msg = new Message(topic, tag, key, value); - messages.add(msg); - } - - try { - this.producer.send(messages); - } catch (Exception e) { - LOG.warn("Batch write failed. Triggering replay.", e); - collector.reportError(e); - throw new FailedException(e); - } - } - -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateFactory.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateFactory.java deleted file mode 100644 index 64b8277f4..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.trident.state; - -import java.util.Map; - -import org.apache.storm.task.IMetricsContext; -import org.apache.storm.trident.state.State; -import org.apache.storm.trident.state.StateFactory; - -public class RocketMqStateFactory implements StateFactory { - - private RocketMqState.Options options; - - public RocketMqStateFactory(RocketMqState.Options options) { - this.options = options; - } - - @Override - public State makeState(Map<String, Object> conf, IMetricsContext metrics, - int partitionIndex, int numPartitions) { - RocketMqState state = new RocketMqState(conf, options); - state.prepare(); - return state; - } - -} diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateUpdater.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateUpdater.java deleted file mode 100644 index 589ac5e3f..000000000 --- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateUpdater.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.trident.state; - -import java.util.List; - -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.state.BaseStateUpdater; -import org.apache.storm.trident.tuple.TridentTuple; - -public class RocketMqStateUpdater extends BaseStateUpdater<RocketMqState> { - - @Override - public void updateState(RocketMqState state, List<TridentTuple> tuples, - TridentCollector collector) { - state.updateState(tuples, collector); - } - -} diff --git a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/DefaultMessageBodySerializerTest.java b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/DefaultMessageBodySerializerTest.java deleted file mode 100644 index dd8ed06f3..000000000 --- a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/DefaultMessageBodySerializerTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq; - -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertArrayEquals; - -public class DefaultMessageBodySerializerTest { - @Test - public void serialize() { - DefaultMessageBodySerializer messageBodySerializer = new DefaultMessageBodySerializer(); - String body = "this is message body data"; - assertArrayEquals(body.getBytes(), messageBodySerializer.serialize(body)); - - body = null; - assertArrayEquals(null, messageBodySerializer.serialize(body)); - } - -} \ No newline at end of file diff --git a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestUtils.java b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestUtils.java deleted file mode 100644 index 399445594..000000000 --- a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestUtils.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq; - -import java.lang.reflect.Field; -import java.util.Arrays; - -import org.apache.storm.Testing; -import org.apache.storm.testing.MkTupleParam; -import org.apache.storm.tuple.Tuple; - -public class TestUtils { - public static void setFieldValue(Object obj, String fieldName, Object value) { - try { - Field field = obj.getClass().getDeclaredField(fieldName); - field.setAccessible(true); - field.set(obj, value); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public static Tuple generateTestTuple(String field1, String field2, Object value1, Object value2) { - MkTupleParam param = new MkTupleParam(); - param.setFields(field1, field2); - Tuple testTuple = Testing.testTuple(Arrays.asList(value1, value2), param); - return testTuple; - } -} diff --git a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/bolt/RocketMqBoltTest.java b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/bolt/RocketMqBoltTest.java deleted file mode 100644 index 76b016764..000000000 --- a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/bolt/RocketMqBoltTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.bolt; - -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.common.message.Message; -import org.apache.storm.rocketmq.TestUtils; -import org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper; -import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector; -import org.apache.storm.tuple.Tuple; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - -public class RocketMqBoltTest { - - private RocketMqBolt rocketMqBolt; - private DefaultMQProducer producer; - - @BeforeEach - public void setUp() { - rocketMqBolt = new RocketMqBolt(); - rocketMqBolt.withSelector(new DefaultTopicSelector("tpc")); - rocketMqBolt.withMapper(new FieldNameBasedTupleToMessageMapper("f1", "f2")); - rocketMqBolt.withBatch(false); - rocketMqBolt.withBatchSize(5); - rocketMqBolt.withAsync(true); - rocketMqBolt.withFlushIntervalSecs(5); - - producer = mock(DefaultMQProducer.class); - TestUtils.setFieldValue(rocketMqBolt, "producer", producer); - } - - @Test - public void execute() throws Exception { - Tuple tuple = TestUtils.generateTestTuple("f1", "f2", "v1", "v2"); - rocketMqBolt.execute(tuple); - - verify(producer).send(any(Message.class), any(SendCallback.class)); - } - - @Test - public void cleanup() { - rocketMqBolt.cleanup(); - verify(producer).shutdown(); - } -} \ No newline at end of file diff --git a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapperTest.java b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapperTest.java deleted file mode 100644 index 885eb5ca9..000000000 --- a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapperTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.common.mapper; - -import org.apache.storm.rocketmq.TestUtils; -import org.apache.storm.tuple.Tuple; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class FieldNameBasedTupleToMessageMapperTest { - @Test - public void withMessageBodySerializer() throws Exception { - FieldNameBasedTupleToMessageMapper messageMapper = new FieldNameBasedTupleToMessageMapper("f1", "f2"); - Tuple tuple = TestUtils.generateTestTuple("f1", "f2", "v1", "v2"); - assertEquals("v1", messageMapper.getKeyFromTuple(tuple)); - assertArrayEquals("v2".getBytes(), messageMapper.getValueFromTuple(tuple)); - } -} \ No newline at end of file diff --git a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelectorTest.java b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelectorTest.java deleted file mode 100644 index 841d5cd9f..000000000 --- a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelectorTest.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.common.selector; - -import org.apache.storm.tuple.Tuple; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class DefaultTopicSelectorTest { - - @Test - public void getTopic() { - DefaultTopicSelector topicSelector = new DefaultTopicSelector("tpc", "tg"); - Tuple tuple = null; - assertEquals("tpc", topicSelector.getTopic(tuple)); - assertEquals("tg", topicSelector.getTag(tuple)); - - topicSelector = new DefaultTopicSelector("tpc2"); - assertEquals("tpc2", topicSelector.getTopic(tuple)); - assertEquals("", topicSelector.getTag(tuple)); - } - -} \ No newline at end of file diff --git a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelectorTest.java b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelectorTest.java deleted file mode 100644 index 0d2365688..000000000 --- a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelectorTest.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.common.selector; - -import org.apache.storm.rocketmq.TestUtils; -import org.apache.storm.tuple.Tuple; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class FieldNameBasedTopicSelectorTest { - @Test - public void getTopic() { - FieldNameBasedTopicSelector topicSelector = new FieldNameBasedTopicSelector("f1", "tpc", "f2", "tg"); - Tuple tuple = TestUtils.generateTestTuple("f1", "fn", "v1", "vn"); - assertEquals("v1", topicSelector.getTopic(tuple)); - assertEquals("tg", topicSelector.getTag(tuple)); - - tuple = TestUtils.generateTestTuple("fn", "f2", "vn", "v2"); - assertEquals("tpc", topicSelector.getTopic(tuple)); - assertEquals("v2", topicSelector.getTag(tuple)); - } - -} \ No newline at end of file diff --git a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/spout/RocketMqSpoutTest.java b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/spout/RocketMqSpoutTest.java deleted file mode 100644 index f036be134..000000000 --- a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/spout/RocketMqSpoutTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.spout; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.common.message.Message; -import org.apache.storm.rocketmq.ConsumerBatchMessage; -import org.apache.storm.rocketmq.RocketMqUtils; -import org.apache.storm.rocketmq.SpoutConfig; -import org.apache.storm.rocketmq.TestUtils; -import org.apache.storm.spout.SpoutOutputCollector; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - -public class RocketMqSpoutTest { - private RocketMqSpout spout; - private DefaultMQPushConsumer consumer; - private SpoutOutputCollector collector; - private BlockingQueue<ConsumerBatchMessage> queue; - private Properties properties; - - @BeforeEach - public void setUp() { - properties = new Properties(); - properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, "address"); - properties.setProperty(SpoutConfig.CONSUMER_GROUP, "group"); - properties.setProperty(SpoutConfig.CONSUMER_TOPIC, "topic"); - - spout = new RocketMqSpout(properties); - consumer = mock(DefaultMQPushConsumer.class); - TestUtils.setFieldValue(spout, "consumer", consumer); - collector = mock(SpoutOutputCollector.class); - TestUtils.setFieldValue(spout, "collector", collector); - - queue = new LinkedBlockingQueue<>(); - TestUtils.setFieldValue(spout, "queue", queue); - - Map<String, ConsumerBatchMessage> cache = mock(Map.class); - TestUtils.setFieldValue(spout, "cache", cache); - } - - @Test - public void nextTuple() throws Exception { - List<List<Object>> list = new ArrayList<>(); - list.add(RocketMqUtils.generateTuples(new Message("tpc", "body".getBytes()), - RocketMqUtils.createScheme(properties))); - ConsumerBatchMessage<List<Object>> batchMessage = new ConsumerBatchMessage<>(list); - queue.put(batchMessage); - - spout.nextTuple(); - verify(collector).emit(anyList(), anyString()); - } - - @Test - public void close() { - spout.close(); - verify(consumer).shutdown(); - } -} \ No newline at end of file diff --git a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueSchemeTest.java b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueSchemeTest.java deleted file mode 100644 index dbb59a51c..000000000 --- a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueSchemeTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.rocketmq.spout.scheme; - -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.Collections; - -import com.google.common.collect.ImmutableMap; -import org.apache.storm.tuple.Fields; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class StringKeyValueSchemeTest { - - private StringKeyValueScheme scheme = new StringKeyValueScheme(); - - @Test - public void testDeserialize() throws Exception { - assertEquals(Collections.singletonList("test"), scheme.deserialize(wrapString("test"))); - } - - @Test - public void testGetOutputFields() throws Exception { - Fields outputFields = scheme.getOutputFields(); - assertTrue(outputFields.contains(StringScheme.STRING_SCHEME_KEY)); - assertEquals(1, outputFields.size()); - } - - @Test - public void testDeserializeWithNullKeyAndValue() throws Exception { - assertEquals(Collections.singletonList("test"), - scheme.deserializeKeyAndValue(null, wrapString("test"))); - } - - @Test - public void testDeserializeWithKeyAndValue() throws Exception { - assertEquals(Collections.singletonList(ImmutableMap.of("key", "test")), - scheme.deserializeKeyAndValue(wrapString("key"), wrapString("test"))); - } - - private static ByteBuffer wrapString(String s) { - return ByteBuffer.wrap(s.getBytes(Charset.defaultCharset())); - } -} diff --git a/pom.xml b/pom.xml index 583e94607..efedc0ed1 100644 --- a/pom.xml +++ b/pom.xml @@ -499,7 +499,6 @@ <module>external/storm-kafka-migration</module> <module>external/storm-kafka-monitor</module> <module>external/storm-jms</module> - <module>external/storm-rocketmq</module> <module>external/storm-blobstore-migration</module> <module>integration-test</module> @@ -524,7 +523,6 @@ <module>examples/storm-hive-examples</module> <module>examples/storm-elasticsearch-examples</module> <module>examples/storm-jms-examples</module> - <module>examples/storm-rocketmq-examples</module> <module>examples/storm-perf</module> </modules> </profile>
