[ 
https://issues.apache.org/jira/browse/STORM-1373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15054416#comment-15054416
 ] 

ASF GitHub Bot commented on STORM-1373:
---------------------------------------

Github user redsanket commented on a diff in the pull request:

    https://github.com/apache/storm/pull/934#discussion_r47433889
  
    --- Diff: 
examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java 
---
    @@ -0,0 +1,248 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package storm.starter;
    +
    +import backtype.storm.Config;
    +import backtype.storm.StormSubmitter;
    +import backtype.storm.LocalCluster;
    +import backtype.storm.blobstore.AtomicOutputStream;
    +import backtype.storm.blobstore.ClientBlobStore;
    +import backtype.storm.blobstore.InputStreamWithMeta;
    +import backtype.storm.blobstore.NimbusBlobStore;
    +
    +import backtype.storm.generated.AccessControl;
    +import backtype.storm.generated.AccessControlType;
    +import backtype.storm.generated.AlreadyAliveException;
    +import backtype.storm.generated.AuthorizationException;
    +import backtype.storm.generated.InvalidTopologyException;
    +import backtype.storm.generated.KeyAlreadyExistsException;
    +import backtype.storm.generated.KeyNotFoundException;
    +import backtype.storm.generated.SettableBlobMeta;
    +import backtype.storm.spout.SpoutOutputCollector;
    +import backtype.storm.task.ShellBolt;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.BasicOutputCollector;
    +import backtype.storm.topology.IRichBolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.TopologyBuilder;
    +import backtype.storm.topology.base.BaseBasicBolt;
    +import backtype.storm.topology.base.BaseRichSpout;
    +import backtype.storm.tuple.Fields;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import backtype.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Random;
    +
    +public class BlobStoreAPIWordCountTopology {
    +    private static NimbusBlobStore store = new NimbusBlobStore(); // 
Client API to invoke blob store API functionality
    +    private static String key = "key1";
    +    private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class);
    +    private static final int READ = 0x01;
    +    private static final int WRITE = 0x02;
    +    private static final int ADMIN = 0x04;
    +    private static final List<AccessControl> WORLD_EVERYTHING =
    +            Arrays.asList(new AccessControl(AccessControlType.OTHER, READ 
| WRITE | ADMIN));
    +
    +    // Spout implementation
    +    public static class RandomSentenceSpout extends BaseRichSpout {
    +        SpoutOutputCollector _collector;
    +        BlobStoreAPIWordCountTopology wc;
    +        String key;
    +        NimbusBlobStore store;
    +
    +
    +        @Override
    +        public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
    +            _collector = collector;
    +            wc = new BlobStoreAPIWordCountTopology();
    +            key = "key1";
    +            store = new NimbusBlobStore();
    +            store.prepare(Utils.readStormConfig());
    +        }
    +
    +        @Override
    +        public void nextTuple() {
    +            Utils.sleep(100);
    +            try {
    +                 _collector.emit(new Values(wc.getBlobContent(key, 
store)));
    +            } catch (AuthorizationException | KeyNotFoundException | 
IOException exp) {
    +                throw new RuntimeException(exp);
    +            }
    +        }
    +
    +        @Override
    +        public void ack(Object id) {
    +        }
    +
    +        @Override
    +        public void fail(Object id) {
    +        }
    +
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("word"));
    --- End diff --
    
    I guess you meant to say the spout part to be @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("sentence"));
    +        } rather than bolt which splits the sentence. Made the changes 
thanks for the catch


> Sample Topology making use of BlobStore API
> -------------------------------------------
>
>                 Key: STORM-1373
>                 URL: https://issues.apache.org/jira/browse/STORM-1373
>             Project: Apache Storm
>          Issue Type: Story
>            Reporter: Sanket Reddy
>            Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to