Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2596#discussion_r176908239
  
    --- Diff: 
examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
 ---
    @@ -53,79 +78,197 @@ public static void main(String[] args) throws 
Exception {
             Fields esFields = new Fields("index", "type", "source");
             EsTupleMapper tupleMapper = 
EsTestUtil.generateDefaultTupleMapper();
             StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
    -        TridentState state = stream.partitionPersist(factory, esFields, 
new EsUpdater(), new Fields());
    +        TridentState state = stream.partitionPersist(factory,
    +                esFields,
    +                new EsUpdater(),
    +                new Fields());
     
             EsTestUtil.startEsNode();
    -        EsTestUtil.waitForSeconds(5);
    +        EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT);
     
    -        StormSubmitter.submitTopology(TOPOLOGY_NAME, new Config(), 
topology.build());
    +        StormSubmitter.submitTopology(TOPOLOGY_NAME,
    +                new Config(),
    +                topology.build());
         }
     
    +    /**
    +     * A fixed batch spout.
    +     */
         public static class FixedBatchSpout implements IBatchSpout {
    -        int maxBatchSize;
    -        HashMap<Long, List<List<Object>>> batches = new HashMap<Long, 
List<List<Object>>>();
    +        private static final long serialVersionUID = 1L;
    +        /**
    +         * The maximum batch size.
    +         */
    +        private int maxBatchSize;
    +        /**
    +         * The passed batches.
    +         */
    +        private HashMap<Long, List<List<Object>>> batches = new 
HashMap<>();
    +        /**
    +         * The output values.
    +         */
             private Values[] outputs = {
    -                new Values("{\"user\":\"user1\"}", "index1", "type1", 
UUID.randomUUID().toString()),
    -                new Values("{\"user\":\"user2\"}", "index1", "type2", 
UUID.randomUUID().toString()),
    -                new Values("{\"user\":\"user3\"}", "index2", "type1", 
UUID.randomUUID().toString()),
    -                new Values("{\"user\":\"user4\"}", "index2", "type2", 
UUID.randomUUID().toString())
    +            new Values("{\"user\":\"user1\"}",
    +                    "index1",
    +                    "type1",
    +                    UUID.randomUUID().toString()),
    +            new Values("{\"user\":\"user2\"}",
    +                    "index1",
    +                    "type2",
    +                    UUID.randomUUID().toString()),
    +            new Values("{\"user\":\"user3\"}",
    +                    "index2",
    +                    "type1",
    +                    UUID.randomUUID().toString()),
    +            new Values("{\"user\":\"user4\"}",
    +                    "index2",
    +                    "type2",
    +                    UUID.randomUUID().toString())
             };
    +        /**
    +         * The current index.
    +         */
             private int index = 0;
    -        boolean cycle = false;
    +        /**
    +         * A flag indicating whether cycling ought to be performed.
    +         */
    +        private boolean cycle = false;
     
    -        public FixedBatchSpout(int maxBatchSize) {
    -            this.maxBatchSize = maxBatchSize;
    +        /**
    +         * Creates a new fixed batch spout.
    +         * @param maxBatchSizeArg the maximum batch size to set
    +         */
    +        public FixedBatchSpout(final int maxBatchSizeArg) {
    +            this.maxBatchSize = maxBatchSizeArg;
             }
     
    -        public void setCycle(boolean cycle) {
    -            this.cycle = cycle;
    +        /**
    +         * Sets the cycle flag.
    +         * @param cycleArg the cycle flag value
    +         */
    +        public void setCycle(final boolean cycleArg) {
    +            this.cycle = cycleArg;
             }
     
    +        /**
    +         * Gets the output fields.
    +         * @return the output fields.
    +         */
             @Override
             public Fields getOutputFields() {
                 return new Fields("source", "index", "type", "id");
             }
     
    +        /**
    +         * Open the topology.
    +         * @param conf the configuration to use for opening
    +         * @param context the context to use for opening
    +         */
             @Override
    -        public void open(Map<String, Object> conf, TopologyContext 
context) {
    +        public void open(final Map<String, Object> conf,
    +                final TopologyContext context) {
                 index = 0;
             }
     
    +        /**
    +         * Emits a batch.
    +         * @param batchId the batch id to use
    +         * @param collector the collector to emit to
    +         */
             @Override
    -        public void emitBatch(long batchId, TridentCollector collector) {
    -            List<List<Object>> batch = this.batches.get(batchId);
    +        public void emitBatch(final long batchId,
    +                final TridentCollector collector) {
    +            List<List<Object>> batch = this.getBatches().get(batchId);
                 if (batch == null) {
                     batch = new ArrayList<List<Object>>();
    -                if (index >= outputs.length && cycle) {
    +                if (index >= outputs.length && isCycle()) {
                         index = 0;
                     }
    -                for (int i = 0; i < maxBatchSize; index++, i++) {
    +                for (int i = 0; i < getMaxBatchSize(); index++, i++) {
                         if (index == outputs.length) {
                             index = 0;
                         }
                         batch.add(outputs[index]);
                     }
    -                this.batches.put(batchId, batch);
    +                this.getBatches().put(batchId, batch);
                 }
                 for (List<Object> list : batch) {
                     collector.emit(list);
                 }
             }
     
    +        /**
    +         * Acknoledges the message with id {@code msgId}.
    +         * @param batchId the message id
    +         */
             @Override
    -        public void ack(long batchId) {
    -            this.batches.remove(batchId);
    +        public void ack(final long batchId) {
    +            this.getBatches().remove(batchId);
             }
     
    +        /**
    +         * Close the spout.
    +         */
             @Override
             public void close() {
             }
     
    +        /**
    +         * Get the component configuration.
    +         * @return the component configuration
    +         */
             @Override
             public Map<String, Object> getComponentConfiguration() {
                 Config conf = new Config();
                 conf.setMaxTaskParallelism(1);
                 return conf;
             }
    +
    +        /**
    +         * Get the maximum batch size.
    +         * @return the maxBatchSize
    +         */
    +        public int getMaxBatchSize() {
    --- End diff --
    
    A little unsure why we need these getters and setters?


---

Reply via email to