This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch STORM-3988 in repository https://gitbox.apache.org/repos/asf/storm.git
commit ee7d71c5ec4bd36dcc86cb1edf436d423180c57f Author: Richard Zowalla <[email protected]> AuthorDate: Thu Oct 19 08:58:34 2023 +0200 STORM-3988 - Remove "storm-mongodb" --- examples/storm-mongodb-examples/pom.xml | 96 ------ .../storm/mongodb/topology/InsertWordCount.java | 72 ----- .../storm/mongodb/topology/LookupWordCount.java | 76 ----- .../storm/mongodb/topology/TotalWordCounter.java | 74 ----- .../storm/mongodb/topology/UpdateWordCount.java | 83 ------ .../apache/storm/mongodb/topology/WordCounter.java | 71 ----- .../apache/storm/mongodb/topology/WordSpout.java | 94 ------ .../storm/mongodb/trident/PrintFunction.java | 41 --- .../storm/mongodb/trident/WordCountTrident.java | 87 ------ .../storm/mongodb/trident/WordCountTridentMap.java | 89 ------ external/storm-mongodb/README.md | 325 --------------------- external/storm-mongodb/pom.xml | 82 ------ .../storm/mongodb/bolt/AbstractMongoBolt.java | 62 ---- .../apache/storm/mongodb/bolt/MongoInsertBolt.java | 124 -------- .../apache/storm/mongodb/bolt/MongoLookupBolt.java | 86 ------ .../apache/storm/mongodb/bolt/MongoUpdateBolt.java | 93 ------ .../apache/storm/mongodb/common/MongoDbClient.java | 109 ------- .../apache/storm/mongodb/common/MongoUtils.java | 44 --- .../storm/mongodb/common/QueryFilterCreator.java | 47 --- .../mongodb/common/SimpleQueryFilterCreator.java | 47 --- .../mongodb/common/mapper/MongoLookupMapper.java | 45 --- .../storm/mongodb/common/mapper/MongoMapper.java | 47 --- .../mongodb/common/mapper/MongoUpdateMapper.java | 26 -- .../common/mapper/SimpleMongoLookupMapper.java | 64 ---- .../mongodb/common/mapper/SimpleMongoMapper.java | 55 ---- .../common/mapper/SimpleMongoUpdateMapper.java | 47 --- .../storm/mongodb/trident/state/MongoMapState.java | 213 -------------- .../storm/mongodb/trident/state/MongoState.java | 145 --------- .../mongodb/trident/state/MongoStateFactory.java | 43 --- .../mongodb/trident/state/MongoStateQuery.java | 41 --- .../mongodb/trident/state/MongoStateUpdater.java | 35 --- pom.xml | 2 - 32 files changed, 2565 deletions(-) diff --git a/examples/storm-mongodb-examples/pom.xml b/examples/storm-mongodb-examples/pom.xml deleted file mode 100644 index f7b0b78b3..000000000 --- a/examples/storm-mongodb-examples/pom.xml +++ /dev/null @@ -1,96 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>storm</artifactId> - <groupId>org.apache.storm</groupId> - <version>2.6.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>storm-mongodb-examples</artifactId> - - <dependencies> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-client</artifactId> - <version>${project.version}</version> - <scope>${provided.scope}</scope> - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-mongodb</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <configuration> - <createDependencyReducedPom>true</createDependencyReducedPom> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.sf</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.dsa</exclude> - <exclude>META-INF/*.RSA</exclude> - <exclude>META-INF/*.rsa</exclude> - <exclude>META-INF/*.EC</exclude> - <exclude>META-INF/*.ec</exclude> - <exclude>META-INF/MSFTSIG.SF</exclude> - <exclude>META-INF/MSFTSIG.RSA</exclude> - </excludes> - </filter> - </filters> - </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> - <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - </transformer> - </transformers> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <!--Note - the version would be inherited--> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-pmd-plugin</artifactId> - </plugin> - </plugins> - </build> -</project> diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java deleted file mode 100644 index f877fca07..000000000 --- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.topology; - -import org.apache.storm.Config; -import org.apache.storm.StormSubmitter; -import org.apache.storm.mongodb.bolt.MongoInsertBolt; -import org.apache.storm.mongodb.common.mapper.MongoMapper; -import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; - -public class InsertWordCount { - private static final String WORD_SPOUT = "WORD_SPOUT"; - private static final String COUNT_BOLT = "COUNT_BOLT"; - private static final String INSERT_BOLT = "INSERT_BOLT"; - - private static final String TEST_MONGODB_URL = "mongodb://127.0.0.1:27017/test"; - private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount"; - - - public static void main(String[] args) throws Exception { - String url = TEST_MONGODB_URL; - String collectionName = TEST_MONGODB_COLLECTION_NAME; - - if (args.length >= 2) { - url = args[0]; - collectionName = args[1]; - } - - WordSpout spout = new WordSpout(); - WordCounter bolt = new WordCounter(); - - MongoMapper mapper = new SimpleMongoMapper() - .withFields("word", "count"); - - MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper); - - // wordSpout ==> countBolt ==> MongoInsertBolt - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout(WORD_SPOUT, spout, 1); - builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT); - builder.setBolt(INSERT_BOLT, insertBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word")); - - String topoName = "test"; - if (args.length == 3) { - topoName = args[2]; - } else if (args.length > 3) { - System.out.println("Usage: InsertWordCount <mongodb url> <mongodb collection> [topology name]"); - return; - } - Config config = new Config(); - StormSubmitter.submitTopology(topoName, config, builder.createTopology()); - } -} diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java deleted file mode 100644 index 6aaec4208..000000000 --- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.topology; - -import org.apache.storm.Config; -import org.apache.storm.StormSubmitter; -import org.apache.storm.mongodb.bolt.MongoLookupBolt; -import org.apache.storm.mongodb.common.QueryFilterCreator; -import org.apache.storm.mongodb.common.SimpleQueryFilterCreator; -import org.apache.storm.mongodb.common.mapper.MongoLookupMapper; -import org.apache.storm.mongodb.common.mapper.SimpleMongoLookupMapper; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; - -public class LookupWordCount { - private static final String WORD_SPOUT = "WORD_SPOUT"; - private static final String LOOKUP_BOLT = "LOOKUP_BOLT"; - private static final String TOTAL_COUNT_BOLT = "TOTAL_COUNT_BOLT"; - - private static final String TEST_MONGODB_URL = "mongodb://127.0.0.1:27017/test"; - private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount"; - - public static void main(String[] args) throws Exception { - String url = TEST_MONGODB_URL; - String collectionName = TEST_MONGODB_COLLECTION_NAME; - - if (args.length >= 2) { - url = args[0]; - collectionName = args[1]; - } - - WordSpout spout = new WordSpout(); - TotalWordCounter totalBolt = new TotalWordCounter(); - - MongoLookupMapper mapper = new SimpleMongoLookupMapper() - .withFields("word", "count"); - - QueryFilterCreator filterCreator = new SimpleQueryFilterCreator() - .withField("word"); - - MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper); - - //wordspout -> lookupbolt -> totalCountBolt - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout(WORD_SPOUT, spout, 1); - builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT); - builder.setBolt(TOTAL_COUNT_BOLT, totalBolt, 1).fieldsGrouping(LOOKUP_BOLT, new Fields("word")); - - String topoName = "test"; - if (args.length == 3) { - topoName = args[2]; - } else if (args.length > 3) { - System.out.println("Usage: LookupWordCount <mongodb url> <mongodb collection> [topology name]"); - return; - } - - Config config = new Config(); - StormSubmitter.submitTopology(topoName, config, builder.createTopology()); - } -} diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java deleted file mode 100644 index 148f2f914..000000000 --- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.topology; - -import static org.apache.storm.utils.Utils.tuple; - -import java.math.BigInteger; -import java.util.Map; -import java.util.Random; - -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.IBasicBolt; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TotalWordCounter implements IBasicBolt { - - private BigInteger total = BigInteger.ZERO; - private static final Logger LOG = LoggerFactory.getLogger(TotalWordCounter.class); - private static final Random RANDOM = new Random(); - - @Override - public void prepare(Map<String, Object> topoConf, TopologyContext context) { - } - - /* - * Just output the word value with a count of 1. - */ - @Override - public void execute(Tuple input, BasicOutputCollector collector) { - total = total.add(new BigInteger(input.getValues().get(1).toString())); - collector.emit(tuple(total.toString())); - //prints the total with low probability. - if (RANDOM.nextInt(1000) > 995) { - LOG.info("Running total = " + total); - } - } - - @Override - public void cleanup() { - LOG.info("Final total = " + total); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("total")); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - -} diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java deleted file mode 100644 index 54205eab0..000000000 --- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.topology; - -import org.apache.storm.Config; -import org.apache.storm.StormSubmitter; -import org.apache.storm.mongodb.bolt.MongoUpdateBolt; -import org.apache.storm.mongodb.common.QueryFilterCreator; -import org.apache.storm.mongodb.common.SimpleQueryFilterCreator; -import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper; -import org.apache.storm.mongodb.common.mapper.SimpleMongoUpdateMapper; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; - -public class UpdateWordCount { - private static final String WORD_SPOUT = "WORD_SPOUT"; - private static final String COUNT_BOLT = "COUNT_BOLT"; - private static final String UPDATE_BOLT = "UPDATE_BOLT"; - - private static final String TEST_MONGODB_URL = "mongodb://127.0.0.1:27017/test"; - private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount"; - - - public static void main(String[] args) throws Exception { - String url = TEST_MONGODB_URL; - String collectionName = TEST_MONGODB_COLLECTION_NAME; - - if (args.length >= 2) { - url = args[0]; - collectionName = args[1]; - } - - WordSpout spout = new WordSpout(); - WordCounter bolt = new WordCounter(); - - MongoUpdateMapper mapper = new SimpleMongoUpdateMapper() - .withFields("word", "count"); - - QueryFilterCreator filterCreator = new SimpleQueryFilterCreator() - .withField("word"); - - MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, filterCreator, mapper); - - //if a new document should be inserted if there are no matches to the query filter - //updateBolt.withUpsert(true); - - //whether find all documents according to the query filter - //updateBolt.withMany(true); - - // wordSpout ==> countBolt ==> MongoUpdateBolt - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout(WORD_SPOUT, spout, 1); - builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT); - builder.setBolt(UPDATE_BOLT, updateBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word")); - - String topoName = "test"; - if (args.length == 3) { - topoName = args[2]; - } else if (args.length > 3) { - System.out.println("Usage: UpdateWordCount <mongodb url> <mongodb collection> [topology name]"); - return; - } - Config config = new Config(); - StormSubmitter.submitTopology(topoName, config, builder.createTopology()); - } -} diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordCounter.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordCounter.java deleted file mode 100644 index 05c04bc29..000000000 --- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordCounter.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.topology; - -import com.google.common.collect.Maps; - -import java.util.Map; - -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.IBasicBolt; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; - -public class WordCounter implements IBasicBolt { - private Map<String, Integer> wordCounter = Maps.newHashMap(); - - @Override - public void prepare(Map<String, Object> topoConf, TopologyContext context) { - - } - - @Override - public void execute(Tuple input, BasicOutputCollector collector) { - String word = input.getStringByField("word"); - int count; - if (wordCounter.containsKey(word)) { - count = wordCounter.get(word) + 1; - wordCounter.put(word, wordCounter.get(word) + 1); - } else { - count = 1; - } - - wordCounter.put(word, count); - collector.emit(new Values(word, String.valueOf(count))); - } - - @Override - public void cleanup() { - - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - -} diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordSpout.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordSpout.java deleted file mode 100644 index 96cab5e2a..000000000 --- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordSpout.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.topology; - -import java.util.Map; -import java.util.Random; -import java.util.UUID; - -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.IRichSpout; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - -public class WordSpout implements IRichSpout { - boolean isDistributed; - SpoutOutputCollector collector; - public static final String[] words = new String[] { "apple", "orange", "pineapple", "banana", "watermelon" }; - - public WordSpout() { - this(true); - } - - public WordSpout(boolean isDistributed) { - this.isDistributed = isDistributed; - } - - public boolean isDistributed() { - return this.isDistributed; - } - - @Override - public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - this.collector = collector; - } - - @Override - public void close() { - - } - - @Override - public void nextTuple() { - final Random rand = new Random(); - final String word = words[rand.nextInt(words.length)]; - this.collector.emit(new Values(word), UUID.randomUUID()); - Thread.yield(); - } - - @Override - public void ack(Object msgId) { - - } - - @Override - public void fail(Object msgId) { - - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - - @Override - public void activate() { - } - - @Override - public void deactivate() { - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } -} diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java deleted file mode 100644 index 66410f381..000000000 --- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.trident; - -import java.util.Random; - -import org.apache.storm.trident.operation.BaseFunction; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.tuple.TridentTuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PrintFunction extends BaseFunction { - - private static final Logger LOG = LoggerFactory.getLogger(PrintFunction.class); - - private static final Random RANDOM = new Random(); - - @Override - public void execute(TridentTuple tuple, TridentCollector tridentCollector) { - if (RANDOM.nextInt(1000) > 995) { - LOG.info(tuple.toString()); - } - } -} diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java deleted file mode 100644 index 858ea532c..000000000 --- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.trident; - -import org.apache.storm.Config; -import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.mongodb.common.mapper.MongoMapper; -import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper; -import org.apache.storm.mongodb.trident.state.MongoState; -import org.apache.storm.mongodb.trident.state.MongoStateFactory; -import org.apache.storm.mongodb.trident.state.MongoStateQuery; -import org.apache.storm.mongodb.trident.state.MongoStateUpdater; -import org.apache.storm.trident.Stream; -import org.apache.storm.trident.TridentState; -import org.apache.storm.trident.TridentTopology; -import org.apache.storm.trident.state.StateFactory; -import org.apache.storm.trident.testing.FixedBatchSpout; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - -public class WordCountTrident { - - public static StormTopology buildTopology(String url, String collectionName) { - Fields fields = new Fields("word", "count"); - FixedBatchSpout spout = new FixedBatchSpout(fields, 4, - new Values("storm", 1), - new Values("trident", 1), - new Values("needs", 1), - new Values("javadoc", 1) - ); - spout.setCycle(true); - - MongoMapper mapper = new SimpleMongoMapper() - .withFields("word", "count"); - - MongoState.Options options = new MongoState.Options() - .withUrl(url) - .withCollectionName(collectionName) - .withMapper(mapper); - - StateFactory factory = new MongoStateFactory(options); - - TridentTopology topology = new TridentTopology(); - Stream stream = topology.newStream("spout1", spout); - - stream.partitionPersist(factory, fields, - new MongoStateUpdater(), new Fields()); - - TridentState state = topology.newStaticState(factory); - stream = stream.stateQuery(state, new Fields("word"), - new MongoStateQuery(), new Fields("columnName", "columnValue")); - stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields()); - return topology.build(); - } - - public static void main(String[] args) throws Exception { - Config conf = new Config(); - conf.setMaxSpoutPending(5); - String topoName = "wordCounter"; - if (args.length == 3) { - topoName = args[2]; - } else if (args.length > 3 || args.length < 2) { - System.out.println("Usage: WordCountTrident <mongodb url> <mongodb collection> [topology name]"); - return; - } - conf.setNumWorkers(3); - StormSubmitter.submitTopology(topoName, conf, buildTopology(args[0], args[1])); - } - -} diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java deleted file mode 100644 index 18f79dc07..000000000 --- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.trident; - -import org.apache.storm.Config; -import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.mongodb.common.QueryFilterCreator; -import org.apache.storm.mongodb.common.SimpleQueryFilterCreator; -import org.apache.storm.mongodb.common.mapper.MongoMapper; -import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper; -import org.apache.storm.mongodb.trident.state.MongoMapState; -import org.apache.storm.trident.Stream; -import org.apache.storm.trident.TridentState; -import org.apache.storm.trident.TridentTopology; -import org.apache.storm.trident.operation.builtin.MapGet; -import org.apache.storm.trident.operation.builtin.Sum; -import org.apache.storm.trident.state.StateFactory; -import org.apache.storm.trident.testing.FixedBatchSpout; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - -public class WordCountTridentMap { - - public static StormTopology buildTopology(String url, String collectionName) { - Fields fields = new Fields("word", "count"); - FixedBatchSpout spout = new FixedBatchSpout(fields, 4, - new Values("storm", 1), - new Values("trident", 1), - new Values("needs", 1), - new Values("javadoc", 1) - ); - spout.setCycle(true); - - MongoMapper mapper = new SimpleMongoMapper() - .withFields("word", "count"); - - MongoMapState.Options options = new MongoMapState.Options(); - options.url = url; - options.collectionName = collectionName; - options.mapper = mapper; - QueryFilterCreator filterCreator = new SimpleQueryFilterCreator() - .withField("word"); - options.queryCreator = filterCreator; - - StateFactory factory = MongoMapState.transactional(options); - - TridentTopology topology = new TridentTopology(); - Stream stream = topology.newStream("spout1", spout); - - TridentState state = stream.groupBy(new Fields("word")) - .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum")); - - stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum")) - .each(new Fields("word", "sum"), new PrintFunction(), new Fields()); - return topology.build(); - } - - public static void main(String[] args) throws Exception { - Config conf = new Config(); - conf.setMaxSpoutPending(5); - String topoName = "wordCounter"; - if (args.length == 3) { - topoName = args[2]; - } else if (args.length > 3 || args.length < 2) { - System.out.println("Usage: WordCountTrident <mongodb url> <mongodb collection> [topology name]"); - return; - } - conf.setNumWorkers(3); - StormSubmitter.submitTopology(topoName, conf, buildTopology(args[0], args[1])); - } - -} diff --git a/external/storm-mongodb/README.md b/external/storm-mongodb/README.md deleted file mode 100644 index 5c817107b..000000000 --- a/external/storm-mongodb/README.md +++ /dev/null @@ -1,325 +0,0 @@ -#Storm MongoDB - -Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute select/update queries against a database collection in a storm topology. - -## Insert into Database -The bolt and trident state included in this package for inserting data into a database collection. - -### MongoMapper -The main API for inserting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoMapper` interface: - -```java -public interface MongoMapper extends Serializable { - Document toDocument(ITuple tuple); - Document toDocumentByKeys(List<Object> keys); -} -``` - -### SimpleMongoMapper -`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoMapper` that can map Storm tuple to a Database document. `SimpleMongoMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to. - -```java -public class SimpleMongoMapper implements MongoMapper { - private String[] fields; - - @Override - public Document toDocument(ITuple tuple) { - Document document = new Document(); - for(String field : fields){ - document.append(field, tuple.getValueByField(field)); - } - return document; - } - - @Override - public Document toDocumentByKeys(List<Object> keys) { - Document document = new Document(); - document.append("_id", MongoUtils.getID(keys)); - return document; - } - - public SimpleMongoMapper withFields(String... fields) { - this.fields = fields; - return this; - } -} -``` - -### MongoInsertBolt -To use the `MongoInsertBolt`, you construct an instance of it by specifying url, collectionName and a `MongoMapper` implementation that converts storm tuple to DB document. The following is the standard URI connection scheme: - `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]` - -More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options - - ```java -String url = "mongodb://127.0.0.1:27017/test"; -String collectionName = "wordcount"; - -MongoMapper mapper = new SimpleMongoMapper() - .withFields("word", "count"); - -MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper); - ``` - - -## Update from Database -The bolt included in this package for updating data from a database collection. - -### MongoUpdateMapper -The main API for updating data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface: - -```java -public interface MongoUpdateMapper extends MongoMapper { } -``` - -### SimpleMongoUpdateMapper -`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to. -`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit -https://docs.mongodb.org/manual/reference/operator/update/ - -```java -public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper { - - private String[] fields; - - @Override - public Document toDocument(ITuple tuple) { - Document document = new Document(); - for(String field : fields){ - document.append(field, tuple.getValueByField(field)); - } - //$set operator: Sets the value of a field in a document. - return new Document("$set", document); - } - - public SimpleMongoUpdateMapper withFields(String... fields) { - this.fields = fields; - return this; - } -} -``` - - -### QueryFilterCreator -The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface: - - ```java -public interface QueryFilterCreator extends Serializable { - Bson createFilter(ITuple tuple); - Bson createFilterByKeys(List<Object> keys); -} - ``` - -### SimpleQueryFilterCreator -`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by given Tuple. `QueryFilterCreator` uses `$eq` operator for matching values that are equal to a specified value. More information about query operator, you can visit -https://docs.mongodb.org/manual/reference/operator/query/ - - ```java -public class SimpleQueryFilterCreator implements QueryFilterCreator { - - private String field; - - @Override - public Bson createFilter(ITuple tuple) { - return Filters.eq(field, tuple.getValueByField(field)); - } - - @Override - public Bson createFilterByKeys(List<Object> keys) { - return Filters.eq("_id", MongoUtils.getID(keys)); - } - - public SimpleQueryFilterCreator withField(String field) { - this.field = field; - return this; - } - -} - ``` - -### MongoUpdateBolt -To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoUpdateMapper` implementation that converts storm tuple to DB document. - - ```java - MongoUpdateMapper mapper = new SimpleMongoUpdateMapper() - .withFields("word", "count"); - - QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator() - .withField("word"); - - MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper); - - //if a new document should be inserted if there are no matches to the query filter - //updateBolt.withUpsert(true); - - //whether find all documents according to the query filter - //updateBolt.withMany(true); - ``` - - Or use a anonymous inner class implementation for `QueryFilterCreator`: - - ```java - MongoUpdateMapper mapper = new SimpleMongoUpdateMapper() - .withFields("word", "count"); - - QueryFilterCreator updateQueryCreator = new QueryFilterCreator() { - @Override - public Bson createFilter(ITuple tuple) { - return Filters.gt("count", 3); - } - }; - - MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper); - - //if a new document should be inserted if there are no matches to the query filter - //updateBolt.withUpsert(true); - ``` - - -## Lookup from Database -The bolt included in this package for selecting data from a database collection. - -### MongoLookupMapper -The main API for selecting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface: - -```java -public interface MongoLookupMapper extends Serializable { - - List<Values> toTuple(ITuple input, Document doc); - - void declareOutputFields(OutputFieldsDeclarer declarer); -} -``` - -### SimpleMongoLookupMapper -`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation called `SimpleMongoLookupMapper` that can converts a Mongo document to a list of storm values. - -```java -public class SimpleMongoLookupMapper implements MongoLookupMapper { - - private String[] fields; - - @Override - public List<Values> toTuple(ITuple input, Document doc) { - Values values = new Values(); - - for(String field : fields) { - if(input.contains(field)) { - values.add(input.getValueByField(field)); - } else { - values.add(doc.get(field)); - } - } - List<Values> result = new ArrayList<Values>(); - result.add(values); - return result; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(fields)); - } - - public SimpleMongoLookupMapper withFields(String... fields) { - this.fields = fields; - return this; - } - -} -``` - -### MongoLookupBolt -To use the `MongoLookupBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoLookupMapper` implementation that converts a Mongo document to a list of storm values. - - ```java - MongoLookupMapper mapper = new SimpleMongoLookupMapper() - .withFields("word", "count"); - - QueryFilterCreator filterCreator = new SimpleQueryFilterCreator() - .withField("word"); - - MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper); - ``` - -## Mongo Trident State&MapState -### Trident State -We support trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below: - - ```java - MongoMapper mapper = new SimpleMongoMapper() - .withFields("word", "count"); - - MongoState.Options options = new MongoState.Options() - .withUrl(url) - .withCollectionName(collectionName) - .withMapper(mapper); - - StateFactory factory = new MongoStateFactory(options); - - TridentTopology topology = new TridentTopology(); - Stream stream = topology.newStream("spout1", spout); - - stream.partitionPersist(factory, fields, - new MongoStateUpdater(), new Fields()); - - TridentState state = topology.newStaticState(factory); - stream = stream.stateQuery(state, new Fields("word"), - new MongoStateQuery(), new Fields("columnName", "columnValue")); - stream.each(new Fields("word", "columnValue"), new PrintFunction(), new Fields()); - ``` - **NOTE**: - >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents. - -### Trident MapState -We also support trident `MapState`. To create a Mongo trident `MapState` you need to initialize it with the url, collectionName, the `MongoMapper` and `QueryFilterCreator` instance. See the example below: - - ```java - MongoMapper mapper = new SimpleMongoMapper() - .withFields("word", "count"); - - QueryFilterCreator filterCreator = new SimpleQueryFilterCreator() - .withField("word"); - - MongoMapState.Options options = new MongoMapState.Options(); - options.url = url; - options.collectionName = collectionName; - options.mapper = mapper; - options.queryCreator = filterCreator; - - StateFactory factory = MongoMapState.transactional(options); - - TridentTopology topology = new TridentTopology(); - Stream stream = topology.newStream("spout1", spout); - - TridentState state = stream.groupBy(new Fields("word")) - .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum")); - - stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum")) - .each(new Fields("word", "sum"), new PrintFunction(), new Fields()); - ``` - - -## License - -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. - -## Committer Sponsors - - * Sriharsha Chintalapani ([[email protected]](mailto:[email protected])) - * Xin Wang ([[email protected]](mailto:[email protected])) - diff --git a/external/storm-mongodb/pom.xml b/external/storm-mongodb/pom.xml deleted file mode 100644 index 66bf9fb9e..000000000 --- a/external/storm-mongodb/pom.xml +++ /dev/null @@ -1,82 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>storm</artifactId> - <groupId>org.apache.storm</groupId> - <version>2.6.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>storm-mongodb</artifactId> - - <developers> - <developer> - <id>vesense</id> - <name>Xin Wang</name> - <email>[email protected]</email> - </developer> - </developers> - - <dependencies> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-client</artifactId> - <version>${project.version}</version> - <scope>${provided.scope}</scope> - </dependency> - <dependency> - <groupId>org.mongodb</groupId> - <artifactId>mongo-java-driver</artifactId> - <version>${mongodb.version}</version> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - </dependency> - <!--test dependencies --> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - </dependency> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest</artifactId> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <!--Note - the version would be inherited--> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-pmd-plugin</artifactId> - </plugin> - </plugins> - </build> -</project> diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java deleted file mode 100644 index 13049de8b..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.bolt; - -import java.util.Map; - -import org.apache.commons.lang.Validate; -import org.apache.storm.mongodb.common.MongoDbClient; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.base.BaseRichBolt; - -public abstract class AbstractMongoBolt extends BaseRichBolt { - - private String url; - private String collectionName; - - protected OutputCollector collector; - protected MongoDbClient mongoClient; - - /** - * AbstractMongoBolt Constructor. - * @param url The MongoDB server url - * @param collectionName The collection where reading/writing data - */ - public AbstractMongoBolt(String url, String collectionName) { - Validate.notEmpty(url, "url can not be blank or null"); - Validate.notEmpty(collectionName, "collectionName can not be blank or null"); - - this.url = url; - this.collectionName = collectionName; - } - - @Override - public void prepare(Map<String, Object> topoConf, TopologyContext context, - OutputCollector collector) { - this.collector = collector; - this.mongoClient = new MongoDbClient(url, collectionName); - } - - @Override - public void cleanup() { - this.mongoClient.close(); - } - -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java deleted file mode 100644 index 5d233dacb..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.bolt; - -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang.Validate; -import org.apache.storm.mongodb.common.mapper.MongoMapper; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.BatchHelper; -import org.apache.storm.utils.TupleUtils; -import org.bson.Document; - -/** - * Basic bolt for writing to MongoDB. - * Note: Each MongoInsertBolt defined in a topology is tied to a specific collection. - */ -public class MongoInsertBolt extends AbstractMongoBolt { - - private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1; - - private MongoMapper mapper; - - private boolean ordered = true; //default is ordered. - - private int batchSize; - - private BatchHelper batchHelper; - - private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS; - - /** - * MongoInsertBolt Constructor. - * @param url The MongoDB server url - * @param collectionName The collection where reading/writing data - * @param mapper MongoMapper converting tuple to an MongoDB document - */ - public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) { - super(url, collectionName); - - Validate.notNull(mapper, "MongoMapper can not be null"); - - this.mapper = mapper; - } - - @Override - public void execute(Tuple tuple) { - try { - if (batchHelper.shouldHandle(tuple)) { - batchHelper.addBatch(tuple); - } - - if (batchHelper.shouldFlush()) { - flushTuples(); - batchHelper.ack(); - } - } catch (Exception e) { - batchHelper.fail(e); - } - } - - private void flushTuples() { - List<Document> docs = new LinkedList<>(); - for (Tuple t : batchHelper.getBatchTuples()) { - Document doc = mapper.toDocument(t); - docs.add(doc); - } - mongoClient.insert(docs, ordered); - } - - public MongoInsertBolt withBatchSize(int batchSize) { - this.batchSize = batchSize; - return this; - } - - public MongoInsertBolt withOrdered(boolean ordered) { - this.ordered = ordered; - return this; - } - - public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) { - this.flushIntervalSecs = flushIntervalSecs; - return this; - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs); - } - - @Override - public void prepare(Map<String, Object> topoConf, TopologyContext context, - OutputCollector collector) { - super.prepare(topoConf, context, collector); - this.batchHelper = new BatchHelper(batchSize, collector); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } - -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java deleted file mode 100644 index 39e1512da..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.bolt; - -import java.util.List; -import org.apache.commons.lang.Validate; -import org.apache.storm.mongodb.common.QueryFilterCreator; -import org.apache.storm.mongodb.common.mapper.MongoLookupMapper; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; -import org.apache.storm.utils.TupleUtils; -import org.bson.Document; -import org.bson.conversions.Bson; - -/** - * Basic bolt for querying from MongoDB. - * Note: Each MongoLookupBolt defined in a topology is tied to a specific collection. - */ -public class MongoLookupBolt extends AbstractMongoBolt { - - private QueryFilterCreator queryCreator; - private MongoLookupMapper mapper; - - /** - * MongoLookupBolt Constructor. - * @param url The MongoDB server url - * @param collectionName The collection where reading/writing data - * @param queryCreator QueryFilterCreator - * @param mapper MongoMapper converting tuple to an MongoDB document - */ - public MongoLookupBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoLookupMapper mapper) { - super(url, collectionName); - - Validate.notNull(queryCreator, "QueryFilterCreator can not be null"); - Validate.notNull(mapper, "MongoLookupMapper can not be null"); - - this.queryCreator = queryCreator; - this.mapper = mapper; - } - - @Override - public void execute(Tuple tuple) { - if (TupleUtils.isTick(tuple)) { - return; - } - - try { - //get query filter - Bson filter = queryCreator.createFilter(tuple); - //find document from mongodb - Document doc = mongoClient.find(filter); - //get storm values and emit - List<Values> valuesList = mapper.toTuple(tuple, doc); - for (Values values : valuesList) { - this.collector.emit(tuple, values); - } - this.collector.ack(tuple); - } catch (Exception e) { - this.collector.reportError(e); - this.collector.fail(tuple); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - mapper.declareOutputFields(declarer); - } - -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java deleted file mode 100644 index 4f92b3254..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.bolt; - -import org.apache.commons.lang.Validate; -import org.apache.storm.mongodb.common.QueryFilterCreator; -import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.TupleUtils; -import org.bson.Document; -import org.bson.conversions.Bson; - -/** - * Basic bolt for updating from MongoDB. - * Note: Each MongoUpdateBolt defined in a topology is tied to a specific collection. - */ -public class MongoUpdateBolt extends AbstractMongoBolt { - - private QueryFilterCreator queryCreator; - private MongoUpdateMapper mapper; - - private boolean upsert; //the default is false. - private boolean many; //the default is false. - - /** - * MongoUpdateBolt Constructor. - * @param url The MongoDB server url - * @param collectionName The collection where reading/writing data - * @param queryCreator QueryFilterCreator - * @param mapper MongoMapper converting tuple to an MongoDB document - */ - public MongoUpdateBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoUpdateMapper mapper) { - super(url, collectionName); - - Validate.notNull(queryCreator, "QueryFilterCreator can not be null"); - Validate.notNull(mapper, "MongoUpdateMapper can not be null"); - - this.queryCreator = queryCreator; - this.mapper = mapper; - } - - @Override - public void execute(Tuple tuple) { - if (TupleUtils.isTick(tuple)) { - return; - } - - try { - //get document - Document doc = mapper.toDocument(tuple); - //get query filter - Bson filter = queryCreator.createFilter(tuple); - mongoClient.update(filter, doc, upsert, many); - this.collector.ack(tuple); - } catch (Exception e) { - this.collector.reportError(e); - this.collector.fail(tuple); - } - } - - public MongoUpdateBolt withUpsert(boolean upsert) { - this.upsert = upsert; - return this; - } - - public MongoUpdateBolt withMany(boolean many) { - this.many = many; - return this; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } - -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java deleted file mode 100644 index 52ed2373c..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.common; - -import com.mongodb.MongoClient; -import com.mongodb.MongoClientURI; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoDatabase; -import com.mongodb.client.model.InsertManyOptions; -import com.mongodb.client.model.UpdateOptions; - -import java.util.List; - -import org.bson.Document; -import org.bson.conversions.Bson; - -public class MongoDbClient { - - private MongoClient client; - private MongoCollection<Document> collection; - - /** - * The MongoDbClient constructor. - * @param url The Mongo server url - * @param collectionName The Mongo collection to read/write data - */ - public MongoDbClient(String url, String collectionName) { - //Creates a MongoURI from the given string. - MongoClientURI uri = new MongoClientURI(url); - //Creates a MongoClient described by a URI. - this.client = new MongoClient(uri); - //Gets a Database. - MongoDatabase db = client.getDatabase(uri.getDatabase()); - //Gets a collection. - this.collection = db.getCollection(collectionName); - } - - /** - * Inserts one or more documents. - * This method is equivalent to a call to the bulkWrite method. - * The documents will be inserted in the order provided, - * stopping on the first failed insertion. - * - * @param documents documents - */ - public void insert(List<Document> documents, boolean ordered) { - InsertManyOptions options = new InsertManyOptions(); - if (!ordered) { - options.ordered(false); - } - collection.insertMany(documents, options); - } - - /** - * Update a single or all documents in the collection according to the specified arguments. - * When upsert set to true, the new document will be inserted if there are no matches to the query filter. - * - * @param filter Bson filter - * @param document Bson document - * @param upsert a new document should be inserted if there are no matches to the query filter - * @param many whether find all documents according to the query filter - */ - public void update(Bson filter, Bson document, boolean upsert, boolean many) { - //TODO batch updating - UpdateOptions options = new UpdateOptions(); - if (upsert) { - options.upsert(true); - } - if (many) { - collection.updateMany(filter, document, options); - } else { - collection.updateOne(filter, document, options); - } - } - - /** - * Finds a single document in the collection according to the specified arguments. - * - * @param filter Bson filter - */ - public Document find(Bson filter) { - //TODO batch finding - return collection.find(filter).first(); - } - - /** - * Closes all resources associated with this instance. - */ - public void close() { - client.close(); - } - -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoUtils.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoUtils.java deleted file mode 100644 index 3ddd4d940..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoUtils.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.common; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.List; - -public final class MongoUtils { - - /** - * Create Mongo _id based on input keys. - * @param keys keys - * @return Mongo _id - */ - public static byte[] getId(List<Object> keys) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - try { - for (Object key : keys) { - bos.write(String.valueOf(key).getBytes()); - } - bos.close(); - } catch (IOException e) { - throw new RuntimeException("IOException creating Mongo document _id.", e); - } - return bos.toByteArray(); - } -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java deleted file mode 100644 index 521678e57..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.common; - -import java.io.Serializable; -import java.util.List; - -import org.apache.storm.tuple.ITuple; -import org.bson.conversions.Bson; - -/** - * Create a MongoDB query Filter by given Tuple/trident keys. - */ -public interface QueryFilterCreator extends Serializable { - - /** - * Create a query Filter by given Tuple. - * - * @param tuple ITuple tuple - * @return query Filter - */ - Bson createFilter(ITuple tuple); - - /** - * Create a query Filter by given trident keys. - * - * @param keys keys - * @return query Filter - */ - Bson createFilterByKeys(List<Object> keys); -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java deleted file mode 100644 index 16a71ec60..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.common; - -import com.mongodb.client.model.Filters; - -import java.util.List; - -import org.apache.storm.tuple.ITuple; -import org.bson.conversions.Bson; - -public class SimpleQueryFilterCreator implements QueryFilterCreator { - - private String field; - - @Override - public Bson createFilter(ITuple tuple) { - return Filters.eq(field, tuple.getValueByField(field)); - } - - @Override - public Bson createFilterByKeys(List<Object> keys) { - return Filters.eq("_id", MongoUtils.getId(keys)); - } - - public SimpleQueryFilterCreator withField(String field) { - this.field = field; - return this; - } - -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoLookupMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoLookupMapper.java deleted file mode 100644 index 0d17820d3..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoLookupMapper.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.common.mapper; - -import java.io.Serializable; -import java.util.List; - -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.ITuple; -import org.apache.storm.tuple.Values; -import org.bson.Document; - -public interface MongoLookupMapper extends Serializable { - - /** - * Converts a Mongo document to a list of storm values that can be emitted. This is done to allow a single - * storm input tuple and a single Mongo document to result in multiple output values. - * @param input the input tuple. - * @param doc the mongo document - * @return a List of storm values that can be emitted. Each item in list is emitted as an output tuple. - */ - List<Values> toTuple(ITuple input, Document doc); - - /** - * declare what are the fields that this code will output. - * @param declarer OutputFieldsDeclarer - */ - void declareOutputFields(OutputFieldsDeclarer declarer); -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java deleted file mode 100644 index ed27c9269..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.common.mapper; - -import java.io.Serializable; -import java.util.List; - -import org.apache.storm.tuple.ITuple; -import org.bson.Document; - -/** - * Given a Tuple/trident keys, converts it to an MongoDB document. - */ -public interface MongoMapper extends Serializable { - - /** - * Converts a Tuple to a Document. - * - * @param tuple the incoming tuple - * @return the MongoDB document - */ - Document toDocument(ITuple tuple); - - /** - * Converts a keys to a Document. - * - * @param keys the trident keys - * @return the MongoDB document - */ - Document toDocumentByKeys(List<Object> keys); -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoUpdateMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoUpdateMapper.java deleted file mode 100644 index 6dcee15c2..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoUpdateMapper.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.common.mapper; - -/** - * MongoUpdateMapper is for defining spec. which is used for converting Tuple/ trident keys to an MongoDB document. - */ -public interface MongoUpdateMapper extends MongoMapper { - -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoLookupMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoLookupMapper.java deleted file mode 100644 index 88407c35e..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoLookupMapper.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.common.mapper; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.ITuple; -import org.apache.storm.tuple.Values; -import org.bson.Document; - -public class SimpleMongoLookupMapper implements MongoLookupMapper { - - private String[] fields; - - public SimpleMongoLookupMapper(String... fields) { - this.fields = fields; - } - - @Override - public List<Values> toTuple(ITuple input, Document doc) { - Values values = new Values(); - - for (String field : fields) { - if (input.contains(field)) { - values.add(input.getValueByField(field)); - } else { - values.add(doc.get(field)); - } - } - List<Values> result = new ArrayList<Values>(); - result.add(values); - return result; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(fields)); - } - - public SimpleMongoLookupMapper withFields(String... fields) { - this.fields = fields; - return this; - } - -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java deleted file mode 100644 index 1a3882821..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.common.mapper; - -import java.util.List; - -import org.apache.storm.mongodb.common.MongoUtils; -import org.apache.storm.tuple.ITuple; -import org.bson.Document; - -public class SimpleMongoMapper implements MongoMapper { - - private String[] fields; - - public SimpleMongoMapper(String... fields) { - this.fields = fields; - } - - @Override - public Document toDocument(ITuple tuple) { - Document document = new Document(); - for (String field : fields) { - document.append(field, tuple.getValueByField(field)); - } - return document; - } - - @Override - public Document toDocumentByKeys(List<Object> keys) { - Document document = new Document(); - document.append("_id", MongoUtils.getId(keys)); - return document; - } - - public SimpleMongoMapper withFields(String... fields) { - this.fields = fields; - return this; - } -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java deleted file mode 100644 index 1c9cd41ef..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.common.mapper; - -import org.apache.storm.tuple.ITuple; -import org.bson.Document; - -public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoUpdateMapper { - - private String[] fields; - - public SimpleMongoUpdateMapper(String... fields) { - this.fields = fields; - } - - @Override - public Document toDocument(ITuple tuple) { - Document document = new Document(); - for (String field : fields) { - document.append(field, tuple.getValueByField(field)); - } - //$set operator: Sets the value of a field in a document. - return new Document("$set", document); - } - - @Override - public SimpleMongoUpdateMapper withFields(String... fields) { - this.fields = fields; - return this; - } -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java deleted file mode 100644 index f6cfc4341..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.trident.state; - -import com.google.common.collect.Maps; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang.Validate; -import org.apache.storm.mongodb.common.MongoDbClient; -import org.apache.storm.mongodb.common.QueryFilterCreator; -import org.apache.storm.mongodb.common.mapper.MongoMapper; -import org.apache.storm.task.IMetricsContext; -import org.apache.storm.topology.FailedException; -import org.apache.storm.trident.state.JSONNonTransactionalSerializer; -import org.apache.storm.trident.state.JSONOpaqueSerializer; -import org.apache.storm.trident.state.JSONTransactionalSerializer; -import org.apache.storm.trident.state.OpaqueValue; -import org.apache.storm.trident.state.Serializer; -import org.apache.storm.trident.state.State; -import org.apache.storm.trident.state.StateFactory; -import org.apache.storm.trident.state.StateType; -import org.apache.storm.trident.state.TransactionalValue; -import org.apache.storm.trident.state.map.CachedMap; -import org.apache.storm.trident.state.map.IBackingMap; -import org.apache.storm.trident.state.map.MapState; -import org.apache.storm.trident.state.map.NonTransactionalMap; -import org.apache.storm.trident.state.map.OpaqueMap; -import org.apache.storm.trident.state.map.SnapshottableMap; -import org.apache.storm.trident.state.map.TransactionalMap; -import org.apache.storm.tuple.Values; -import org.bson.Document; -import org.bson.conversions.Bson; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MongoMapState<T> implements IBackingMap<T> { - private static Logger LOG = LoggerFactory.getLogger(MongoMapState.class); - - @SuppressWarnings("rawtypes") - private static final Map<StateType, Serializer> DEFAULT_SERIALZERS = Maps.newHashMap(); - - static { - DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer()); - DEFAULT_SERIALZERS.put(StateType.TRANSACTIONAL, new JSONTransactionalSerializer()); - DEFAULT_SERIALZERS.put(StateType.OPAQUE, new JSONOpaqueSerializer()); - } - - private Options<T> options; - private Serializer<T> serializer; - private MongoDbClient mongoClient; - private Map<String, Object> map; - - protected MongoMapState(Map<String, Object> map, Options options) { - this.options = options; - this.map = map; - this.serializer = options.serializer; - - Validate.notEmpty(options.url, "url can not be blank or null"); - Validate.notEmpty(options.collectionName, "collectionName can not be blank or null"); - Validate.notNull(options.queryCreator, "queryCreator can not be null"); - Validate.notNull(options.mapper, "mapper can not be null"); - - this.mongoClient = new MongoDbClient(options.url, options.collectionName); - } - - public static class Options<T> implements Serializable { - public String url; - public String collectionName; - public MongoMapper mapper; - public QueryFilterCreator queryCreator; - public Serializer<T> serializer; - public int cacheSize = 5000; - public String globalKey = "$MONGO-MAP-STATE-GLOBAL"; - public String serDocumentField = "tridentSerField"; - } - - - @SuppressWarnings("rawtypes") - public static StateFactory opaque() { - Options<OpaqueValue> options = new Options<OpaqueValue>(); - return opaque(options); - } - - @SuppressWarnings("rawtypes") - public static StateFactory opaque(Options<OpaqueValue> opts) { - - return new Factory(StateType.OPAQUE, opts); - } - - @SuppressWarnings("rawtypes") - public static StateFactory transactional() { - Options<TransactionalValue> options = new Options<TransactionalValue>(); - return transactional(options); - } - - @SuppressWarnings("rawtypes") - public static StateFactory transactional(Options<TransactionalValue> opts) { - return new Factory(StateType.TRANSACTIONAL, opts); - } - - public static StateFactory nonTransactional() { - Options<Object> options = new Options<Object>(); - return nonTransactional(options); - } - - public static StateFactory nonTransactional(Options<Object> opts) { - return new Factory(StateType.NON_TRANSACTIONAL, opts); - } - - - protected static class Factory implements StateFactory { - private StateType stateType; - private Options options; - - @SuppressWarnings({"rawtypes", "unchecked"}) - public Factory(StateType stateType, Options options) { - this.stateType = stateType; - this.options = options; - - if (this.options.serializer == null) { - this.options.serializer = DEFAULT_SERIALZERS.get(stateType); - } - - if (this.options.serializer == null) { - throw new RuntimeException("Serializer should be specified for type: " + stateType); - } - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - @Override - public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - IBackingMap state = new MongoMapState(conf, options); - - if (options.cacheSize > 0) { - state = new CachedMap(state, options.cacheSize); - } - - MapState mapState; - switch (stateType) { - case NON_TRANSACTIONAL: - mapState = NonTransactionalMap.build(state); - break; - case OPAQUE: - mapState = OpaqueMap.build(state); - break; - case TRANSACTIONAL: - mapState = TransactionalMap.build(state); - break; - default: - throw new IllegalArgumentException("Unknown state type: " + stateType); - } - return new SnapshottableMap(mapState, new Values(options.globalKey)); - } - - } - - @Override - public List<T> multiGet(List<List<Object>> keysList) { - List<T> retval = new ArrayList<>(); - try { - for (List<Object> keys : keysList) { - Bson filter = options.queryCreator.createFilterByKeys(keys); - Document doc = mongoClient.find(filter); - if (doc != null) { - retval.add(this.serializer.deserialize((byte[]) doc.get(options.serDocumentField))); - } else { - retval.add(null); - } - } - } catch (Exception e) { - LOG.warn("Batch get operation failed.", e); - throw new FailedException(e); - } - return retval; - } - - @Override - public void multiPut(List<List<Object>> keysList, List<T> values) { - try { - for (int i = 0; i < keysList.size(); i++) { - List<Object> keys = keysList.get(i); - T value = values.get(i); - Bson filter = options.queryCreator.createFilterByKeys(keys); - Document document = options.mapper.toDocumentByKeys(keys); - document.append(options.serDocumentField, this.serializer.serialize(value)); - this.mongoClient.update(filter, document, true, false); - } - } catch (Exception e) { - LOG.warn("Batch write operation failed.", e); - throw new FailedException(e); - } - } -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java deleted file mode 100644 index 100f9313a..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.trident.state; - -import com.google.common.collect.Lists; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang.Validate; -import org.apache.storm.mongodb.common.MongoDbClient; -import org.apache.storm.mongodb.common.QueryFilterCreator; -import org.apache.storm.mongodb.common.mapper.MongoLookupMapper; -import org.apache.storm.mongodb.common.mapper.MongoMapper; -import org.apache.storm.topology.FailedException; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.state.State; -import org.apache.storm.trident.tuple.TridentTuple; -import org.apache.storm.tuple.Values; -import org.bson.Document; -import org.bson.conversions.Bson; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MongoState implements State { - - private static final Logger LOG = LoggerFactory.getLogger(MongoState.class); - - private Options options; - private MongoDbClient mongoClient; - private Map<String, Object> map; - - protected MongoState(Map<String, Object> map, Options options) { - this.options = options; - this.map = map; - } - - public static class Options implements Serializable { - private String url; - private String collectionName; - private MongoMapper mapper; - private MongoLookupMapper lookupMapper; - private QueryFilterCreator queryCreator; - - public Options withUrl(String url) { - this.url = url; - return this; - } - - public Options withCollectionName(String collectionName) { - this.collectionName = collectionName; - return this; - } - - public Options withMapper(MongoMapper mapper) { - this.mapper = mapper; - return this; - } - - public Options withMongoLookupMapper(MongoLookupMapper lookupMapper) { - this.lookupMapper = lookupMapper; - return this; - } - - public Options withQueryFilterCreator(QueryFilterCreator queryCreator) { - this.queryCreator = queryCreator; - return this; - } - } - - protected void prepare() { - Validate.notEmpty(options.url, "url can not be blank or null"); - Validate.notEmpty(options.collectionName, "collectionName can not be blank or null"); - - this.mongoClient = new MongoDbClient(options.url, options.collectionName); - } - - @Override - public void beginCommit(Long txid) { - LOG.debug("beginCommit is noop."); - } - - @Override - public void commit(Long txid) { - LOG.debug("commit is noop."); - } - - /** - * Update Mongo state. - * @param tuples trident tuples - * @param collector trident collector - */ - public void updateState(List<TridentTuple> tuples, TridentCollector collector) { - List<Document> documents = Lists.newArrayList(); - for (TridentTuple tuple : tuples) { - Document document = options.mapper.toDocument(tuple); - documents.add(document); - } - - try { - this.mongoClient.insert(documents, true); - } catch (Exception e) { - LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e); - throw new FailedException(e); - } - } - - /** - * Batch retrieve values. - * @param tridentTuples trident tuples - * @return values - */ - public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) { - List<List<Values>> batchRetrieveResult = Lists.newArrayList(); - try { - for (TridentTuple tuple : tridentTuples) { - Bson filter = options.queryCreator.createFilter(tuple); - Document doc = mongoClient.find(filter); - List<Values> values = options.lookupMapper.toTuple(tuple, doc); - batchRetrieveResult.add(values); - } - } catch (Exception e) { - LOG.warn("Batch get operation failed. Triggering replay.", e); - throw new FailedException(e); - } - return batchRetrieveResult; - } -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java deleted file mode 100644 index d27d9a92d..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.trident.state; - -import java.util.Map; - -import org.apache.storm.task.IMetricsContext; -import org.apache.storm.trident.state.State; -import org.apache.storm.trident.state.StateFactory; - -public class MongoStateFactory implements StateFactory { - - private MongoState.Options options; - - public MongoStateFactory(MongoState.Options options) { - this.options = options; - } - - @Override - public State makeState(Map<String, Object> conf, IMetricsContext metrics, - int partitionIndex, int numPartitions) { - MongoState state = new MongoState(conf, options); - state.prepare(); - return state; - } - -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateQuery.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateQuery.java deleted file mode 100644 index 2cf46cff2..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateQuery.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.trident.state; - -import java.util.List; - -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.state.BaseQueryFunction; -import org.apache.storm.trident.tuple.TridentTuple; -import org.apache.storm.tuple.Values; - -public class MongoStateQuery extends BaseQueryFunction<MongoState, List<Values>> { - - @Override - public List<List<Values>> batchRetrieve(MongoState mongoState, List<TridentTuple> tridentTuples) { - return mongoState.batchRetrieve(tridentTuples); - } - - @Override - public void execute(TridentTuple tuples, List<Values> values, TridentCollector tridentCollector) { - for (Values value : values) { - tridentCollector.emit(value); - } - } -} diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java deleted file mode 100644 index 41311b3c3..000000000 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mongodb.trident.state; - -import java.util.List; - -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.state.BaseStateUpdater; -import org.apache.storm.trident.tuple.TridentTuple; - -public class MongoStateUpdater extends BaseStateUpdater<MongoState> { - - @Override - public void updateState(MongoState state, List<TridentTuple> tuples, - TridentCollector collector) { - state.updateState(tuples, collector); - } - -} diff --git a/pom.xml b/pom.xml index b460324d4..8d04950e1 100644 --- a/pom.xml +++ b/pom.xml @@ -496,7 +496,6 @@ <module>external/storm-solr</module> <module>external/storm-metrics</module> <module>external/storm-mqtt</module> - <module>external/storm-mongodb</module> <module>external/storm-kafka-client</module> <module>external/storm-kafka-migration</module> <module>external/storm-opentsdb</module> @@ -520,7 +519,6 @@ <modules> <module>examples/storm-starter</module> <module>examples/storm-loadgen</module> - <module>examples/storm-mongodb-examples</module> <module>examples/storm-redis-examples</module> <module>examples/storm-opentsdb-examples</module> <module>examples/storm-solr-examples</module>
