[ https://issues.apache.org/jira/browse/STORM-1167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14989735#comment-14989735 ]
ASF GitHub Bot commented on STORM-1167: --------------------------------------- Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/855#discussion_r43894940 --- Diff: examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java --- @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.starter; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.topology.base.BaseWindowedBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; +import backtype.storm.windowing.TupleWindow; +import storm.starter.bolt.PrinterBolt; + +import java.util.Map; +import java.util.Random; + +import static backtype.storm.topology.base.BaseWindowedBolt.Count; +import static backtype.storm.topology.base.BaseWindowedBolt.Duration; + +/** + * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt} + * to calculate sliding window sum. + */ +public class SlidingWindowTopology { + + /* + * emits random integers every 100 ms + */ + private static class RandomIntegerSpout extends BaseRichSpout { + SpoutOutputCollector collector; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("value")); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; + } + + @Override + public void nextTuple() { + Utils.sleep(100); + Random rand = new Random(); + Integer value = rand.nextInt(1000); + collector.emit(new Values(value)); + } + } + + /* + * Computes sliding window sum + */ + private static class SlidingWindowSumBolt extends BaseWindowedBolt { + private int sum = 0; + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(TupleWindow inputWindow) { + System.out.println("Events in current window: " + inputWindow.get().size()); + for (Tuple tuple : inputWindow.getNew()) { + sum += (int) tuple.getValue(0); + } + for (Tuple tuple : inputWindow.getExpired()) { + sum -= (int) tuple.getValue(0); + } + collector.emit(new Values(sum)); --- End diff -- I think this is trying to show an optimization where we dont calculate the entire sum each time, instead we just add the new values and subtract the expired one. > Add sliding & tumbling window support for core storm > ---------------------------------------------------- > > Key: STORM-1167 > URL: https://issues.apache.org/jira/browse/STORM-1167 > Project: Apache Storm > Issue Type: Improvement > Reporter: Arun Mahadevan > Assignee: Arun Mahadevan > > Currently, topologies that needs windowing support requires writing custom > logic inside bolts making it tedious to handle the windowing and acking logic > with custom logic. > We can add framework level support to core storm bolts to process tuples in a > time or a count based window. Sliding and tumbling windows can be supported. > Later this can be extended to trident apis as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)