[ https://issues.apache.org/jira/browse/STORM-1373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15054416#comment-15054416 ]
ASF GitHub Bot commented on STORM-1373: --------------------------------------- Github user redsanket commented on a diff in the pull request: https://github.com/apache/storm/pull/934#discussion_r47433889 --- Diff: examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.starter; + +import backtype.storm.Config; +import backtype.storm.StormSubmitter; +import backtype.storm.LocalCluster; +import backtype.storm.blobstore.AtomicOutputStream; +import backtype.storm.blobstore.ClientBlobStore; +import backtype.storm.blobstore.InputStreamWithMeta; +import backtype.storm.blobstore.NimbusBlobStore; + +import backtype.storm.generated.AccessControl; +import backtype.storm.generated.AccessControlType; +import backtype.storm.generated.AlreadyAliveException; +import backtype.storm.generated.AuthorizationException; +import backtype.storm.generated.InvalidTopologyException; +import backtype.storm.generated.KeyAlreadyExistsException; +import backtype.storm.generated.KeyNotFoundException; +import backtype.storm.generated.SettableBlobMeta; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.ShellBolt; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.BasicOutputCollector; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseBasicBolt; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +public class BlobStoreAPIWordCountTopology { + private static NimbusBlobStore store = new NimbusBlobStore(); // Client API to invoke blob store API functionality + private static String key = "key1"; + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class); + private static final int READ = 0x01; + private static final int WRITE = 0x02; + private static final int ADMIN = 0x04; + private static final List<AccessControl> WORLD_EVERYTHING = + Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | WRITE | ADMIN)); + + // Spout implementation + public static class RandomSentenceSpout extends BaseRichSpout { + SpoutOutputCollector _collector; + BlobStoreAPIWordCountTopology wc; + String key; + NimbusBlobStore store; + + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + wc = new BlobStoreAPIWordCountTopology(); + key = "key1"; + store = new NimbusBlobStore(); + store.prepare(Utils.readStormConfig()); + } + + @Override + public void nextTuple() { + Utils.sleep(100); + try { + _collector.emit(new Values(wc.getBlobContent(key, store))); + } catch (AuthorizationException | KeyNotFoundException | IOException exp) { + throw new RuntimeException(exp); + } + } + + @Override + public void ack(Object id) { + } + + @Override + public void fail(Object id) { + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); --- End diff -- I guess you meant to say the spout part to be @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("sentence")); + } rather than bolt which splits the sentence. Made the changes thanks for the catch > Sample Topology making use of BlobStore API > ------------------------------------------- > > Key: STORM-1373 > URL: https://issues.apache.org/jira/browse/STORM-1373 > Project: Apache Storm > Issue Type: Story > Reporter: Sanket Reddy > Priority: Minor > -- This message was sent by Atlassian JIRA (v6.3.4#6332)