Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2641#discussion_r183402775 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/BlobStoreAPIWordCountTopology.java --- @@ -38,176 +47,66 @@ import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.topology.base.BaseRichSpout; -import org.apache.storm.blobstore.BlobStoreAclHandler; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.StringTokenizer; - public class BlobStoreAPIWordCountTopology { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class); private static ClientBlobStore store; // Client API to invoke blob store API functionality private static String key = "key"; private static String fileName = "blacklist.txt"; - private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class); public static void prepare() { Config conf = new Config(); conf.putAll(Utils.readStormConfig()); store = Utils.getClientBlobStore(conf); } - // Spout implementation - public static class RandomSentenceSpout extends BaseRichSpout { - SpoutOutputCollector _collector; - - @Override - public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - } - - @Override - public void nextTuple() { - Utils.sleep(100); - _collector.emit(new Values(getRandomSentence())); - } - - @Override - public void ack(Object id) { - } - - @Override - public void fail(Object id) { - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("sentence")); - } - - } - - // Bolt implementation - public static class SplitSentence extends ShellBolt implements IRichBolt { - - public SplitSentence() { - super("python", "splitsentence.py"); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - } - - public static class FilterWords extends BaseBasicBolt { - boolean poll = false; - long pollTime; - Set<String> wordSet; - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String word = tuple.getString(0); - // Thread Polling every 5 seconds to update the wordSet seconds which is - // used in FilterWords bolt to filter the words - try { - if (!poll) { - wordSet = parseFile(fileName); - pollTime = System.currentTimeMillis(); - poll = true; - } else { - if ((System.currentTimeMillis() - pollTime) > 5000) { - wordSet = parseFile(fileName); - pollTime = System.currentTimeMillis(); - } - } - } catch (IOException exp) { - throw new RuntimeException(exp); - } - if (wordSet !=null && !wordSet.contains(word)) { - collector.emit(new Values(word)); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - } - - public void buildAndLaunchWordCountTopology(String[] args) { - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("spout", new RandomSentenceSpout(), 5); - builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); - builder.setBolt("filter", new FilterWords(), 6).shuffleGrouping("split"); - - Config conf = new Config(); - conf.setDebug(true); - try { - conf.setNumWorkers(3); - StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); - } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException exp) { - throw new RuntimeException(exp); - } - } - --- End diff -- Why are we moving locations of the functions
---