[ 
https://issues.apache.org/jira/browse/STORM-1167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14989735#comment-14989735
 ] 

ASF GitHub Bot commented on STORM-1167:
---------------------------------------

Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43894940
  
    --- Diff: 
examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java ---
    @@ -0,0 +1,129 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package storm.starter;
    +
    +import backtype.storm.Config;
    +import backtype.storm.LocalCluster;
    +import backtype.storm.StormSubmitter;
    +import backtype.storm.spout.SpoutOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.TopologyBuilder;
    +import backtype.storm.topology.base.BaseRichSpout;
    +import backtype.storm.topology.base.BaseWindowedBolt;
    +import backtype.storm.tuple.Fields;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import backtype.storm.utils.Utils;
    +import backtype.storm.windowing.TupleWindow;
    +import storm.starter.bolt.PrinterBolt;
    +
    +import java.util.Map;
    +import java.util.Random;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * A sample topology that demonstrates the usage of {@link 
backtype.storm.topology.IWindowedBolt}
    + * to calculate sliding window sum.
    + */
    +public class SlidingWindowTopology {
    +
    +    /*
    +     * emits random integers every 100 ms
    +     */
    +    private static class RandomIntegerSpout extends BaseRichSpout {
    +        SpoutOutputCollector collector;
    +
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("value"));
    +        }
    +
    +        @Override
    +        public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void nextTuple() {
    +            Utils.sleep(100);
    +            Random rand = new Random();
    +            Integer value = rand.nextInt(1000);
    +            collector.emit(new Values(value));
    +        }
    +    }
    +
    +    /*
    +     * Computes sliding window sum
    +     */
    +    private static class SlidingWindowSumBolt extends BaseWindowedBolt {
    +        private int sum = 0;
    +        private OutputCollector collector;
    +
    +        @Override
    +        public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void execute(TupleWindow inputWindow) {
    +            System.out.println("Events in current window: " + 
inputWindow.get().size());
    +            for (Tuple tuple : inputWindow.getNew()) {
    +                sum += (int) tuple.getValue(0);
    +            }
    +            for (Tuple tuple : inputWindow.getExpired()) {
    +                sum -= (int) tuple.getValue(0);
    +            }
    +            collector.emit(new Values(sum));
    --- End diff --
    
    I think this is trying to show an optimization where we dont calculate the 
entire sum each time, instead we just add the new values and subtract the 
expired one. 


> Add sliding & tumbling window support for core storm
> ----------------------------------------------------
>
>                 Key: STORM-1167
>                 URL: https://issues.apache.org/jira/browse/STORM-1167
>             Project: Apache Storm
>          Issue Type: Improvement
>            Reporter: Arun Mahadevan
>            Assignee: Arun Mahadevan
>
> Currently, topologies that needs windowing support requires writing custom 
> logic inside bolts making it tedious to handle the windowing and acking logic 
> with custom logic.
> We can add framework level support to core storm bolts to process tuples in a 
> time or a count based window. Sliding and tumbling windows can be supported.
> Later this can be extended to trident apis as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to