Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176908022
  
    --- Diff: 
examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
 ---
    @@ -34,52 +39,128 @@
     import org.apache.storm.tuple.Fields;
     import org.apache.storm.tuple.Values;
     
    -public class EsIndexTopology {
    +/**
    + * Demonstrates an ElasticSearch Strom topology.
    + * @author unknown
    + */
    +public final class EsIndexTopology {
     
    +    /**
    +     * The id of the used spout.
    +     */
         static final String SPOUT_ID = "spout";
    +    /**
    +     * The id of the used bolt.
    +     */
         static final String BOLT_ID = "bolt";
    +    /**
    +     * The name of the used topology.
    +     */
         static final String TOPOLOGY_NAME = "elasticsearch-test-topology1";
    +    /**
    +     * The number of pending tuples triggering logging.
    +     */
    +    private static final int PENDING_COUNT_MAX = 1000;
     
    -    public static void main(String[] args) throws Exception {
    +    /**
    +     * The example's main method.
    +     * @param args the command line arguments
    +     * @throws AlreadyAliveException if the topology is already started
    +     * @throws InvalidTopologyException if the topology is invalid
    +     * @throws AuthorizationException if the topology authorization fails
    +     */
    +    public static void main(final String[] args) throws 
AlreadyAliveException,
    +            InvalidTopologyException,
    +            AuthorizationException {
             Config config = new Config();
             config.setNumWorkers(1);
             TopologyBuilder builder = new TopologyBuilder();
             UserDataSpout spout = new UserDataSpout();
             builder.setSpout(SPOUT_ID, spout, 1);
             EsTupleMapper tupleMapper = 
EsTestUtil.generateDefaultTupleMapper();
             EsConfig esConfig = new EsConfig("http://localhost:9300";);
    -        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 
1).shuffleGrouping(SPOUT_ID);
    +        builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1)
    +                .shuffleGrouping(SPOUT_ID);
     
             EsTestUtil.startEsNode();
    -        EsTestUtil.waitForSeconds(5);
    -        StormSubmitter.submitTopology(TOPOLOGY_NAME, config, 
builder.createTopology());
    +        EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT);
    +        StormSubmitter.submitTopology(TOPOLOGY_NAME,
    +                config,
    +                builder.createTopology());
         }
     
    +    /**
    +     * The user data spout.
    +     */
         public static class UserDataSpout extends BaseRichSpout {
    +        private static final long serialVersionUID = 1L;
    +        /**
    +         * The pending values.
    +         */
             private ConcurrentHashMap<UUID, Values> pending;
    +        /**
    +         * The collector passed in
    +         * {@link #open(java.util.Map, 
org.apache.storm.task.TopologyContext,
    +         * org.apache.storm.spout.SpoutOutputCollector) }.
    +         */
             private SpoutOutputCollector collector;
    +        /**
    +         * The sources.
    +         */
             private String[] sources = {
    -                "{\"user\":\"user1\"}",
    -                "{\"user\":\"user2\"}",
    -                "{\"user\":\"user3\"}",
    -                "{\"user\":\"user4\"}"
    +            "{\"user\":\"user1\"}",
    +            "{\"user\":\"user2\"}",
    +            "{\"user\":\"user3\"}",
    +            "{\"user\":\"user4\"}"
             };
    +        /**
    +         * The current index.
    +         */
             private int index = 0;
    +        /**
    +         * The current count.
    +         */
             private int count = 0;
    +        /**
    +         * The total.
    +         */
             private long total = 0L;
    +        /**
    +         * The index name.
    +         */
             private String indexName = "index1";
    +        /**
    +         * The type name.
    +         */
             private String typeName = "type1";
     
    -        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +        /**
    +         * Declares {@code source}, {@code index}, {@code type} and {@code 
id}.
    +         * @param declarer the declarer to pass to
    +         */
    +        @Override
    +        public void declareOutputFields(final OutputFieldsDeclarer 
declarer) {
                 declarer.declare(new Fields("source", "index", "type", "id"));
             }
     
    -        public void open(Map<String, Object> config, TopologyContext 
context,
    -                         SpoutOutputCollector collector) {
    -            this.collector = collector;
    -            this.pending = new ConcurrentHashMap<UUID, Values>();
    +        /**
    +         * Acquires {@code collector} and initializes {@code pending}.
    +         * @param config unused
    +         * @param context unused
    +         * @param collectorArg the collector to acquire
    +         */
    +        @Override
    +        public void open(final Map<String, Object> config,
    +                final TopologyContext context,
    +                final SpoutOutputCollector collectorArg) {
    +            this.collector = collectorArg;
    +            this.pending = new ConcurrentHashMap<>();
             }
     
    +        /**
    +         * Processes the next tuple.
    --- End diff --
    
    Nit: nextTuple makes the spout emit the next tuple, if any. 


---

Reply via email to