[
https://issues.apache.org/jira/browse/STORM-1167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14990125#comment-14990125
]
ASF GitHub Bot commented on STORM-1167:
---------------------------------------
Github user Parth-Brahmbhatt commented on a diff in the pull request:
https://github.com/apache/storm/pull/855#discussion_r43918998
--- Diff:
storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.topology.base;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IWindowedBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class BaseWindowedBolt implements IWindowedBolt {
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseWindowedBolt.class);
+
+ private transient Map<String, Object> windowConfiguration;
+
+ /**
+ * Holds a count value for count based windows and sliding intervals.
+ */
+ public static class Count {
+ public final int value;
+
+ public Count(int value) {
+ this.value = value;
+ }
+ }
+
+ /**
+ * Holds a Time duration for time based windows and sliding intervals.
+ */
+ public static class Duration {
+ public final int value;
+
+ public Duration(int value, TimeUnit timeUnit) {
+ this.value = (int) timeUnit.toMillis(value);
+ }
+ }
+
+ protected BaseWindowedBolt() {
+ windowConfiguration = new HashMap<>();
+ }
+
+ private BaseWindowedBolt withWindowLength(Count count) {
+ windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT,
count.value);
+ return this;
+ }
+
+ private BaseWindowedBolt withWindowLength(Duration duration) {
+
windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS,
duration.value);
+ return this;
+ }
+
+ private BaseWindowedBolt withSlidingInterval(Count count) {
+
windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT,
count.value);
+ return this;
+ }
+
+ private BaseWindowedBolt withSlidingInterval(Duration duration) {
+
windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS,
duration.value);
+ return this;
+ }
+
+ /**
+ * Tuple count based sliding window configuration.
+ *
+ * @param windowLength the number of tuples in the window
+ * @param slidingInterval the number of tuples after which the window
slides
+ */
+ public BaseWindowedBolt withWindow(Count windowLength, Count
slidingInterval) {
+ return
withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+ }
+
+ /**
+ * Tuple count and time duration based sliding window configuration.
+ *
+ * @param windowLength the number of tuples in the window
+ * @param slidingInterval the time duration after which the window
slides
+ */
+ public BaseWindowedBolt withWindow(Count windowLength, Duration
slidingInterval) {
+ return
withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+ }
+
+ /**
+ * Time duration and count based sliding window configuration.
+ *
+ * @param windowLength the time duration of the window
+ * @param slidingInterval the number of tuples after which the window
slides
+ */
+ public BaseWindowedBolt withWindow(Duration windowLength, Count
slidingInterval) {
+ return
withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+ }
+
+ /**
+ * Time duration based sliding window configuration.
+ *
+ * @param windowLength the time duration of the window
+ * @param slidingInterval the time duration after which the window
slides
+ */
+ public BaseWindowedBolt withWindow(Duration windowLength, Duration
slidingInterval) {
+ return
withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+ }
+
+ /**
+ * A tuple count based window that slides with every incoming tuple.
+ *
+ * @param windowLength the number of tuples in the window
+ */
+ public BaseWindowedBolt withWindow(Count windowLength) {
+ return withWindowLength(windowLength).withSlidingInterval(new
Count(1));
+ }
+
+
+ /**
+ * A time duration based window that slides with every incoming tuple.
+ *
+ * @param windowLength the time duration of the window
+ */
+ public BaseWindowedBolt withWindow(Duration windowLength) {
+ return withWindowLength(windowLength).withSlidingInterval(new
Count(1));
+ }
+
+ /**
+ * A count based tumbling window.
+ *
+ * @param count the number of tuples after which the window tumbles
+ */
+ public BaseWindowedBolt withTumblingWindow(Count count) {
+ return withWindowLength(count).withSlidingInterval(count);
+ }
+
+ /**
+ * A time duration based tumbling window.
+ *
+ * @param duration the time duration after which the window tumbles
+ */
+ public BaseWindowedBolt withTumblingWindow(Duration duration) {
+ return withWindowLength(duration).withSlidingInterval(duration);
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
+ }
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+
--- End diff --
+1.
> Add sliding & tumbling window support for core storm
> ----------------------------------------------------
>
> Key: STORM-1167
> URL: https://issues.apache.org/jira/browse/STORM-1167
> Project: Apache Storm
> Issue Type: Improvement
> Reporter: Arun Mahadevan
> Assignee: Arun Mahadevan
>
> Currently, topologies that needs windowing support requires writing custom
> logic inside bolts making it tedious to handle the windowing and acking logic
> with custom logic.
> We can add framework level support to core storm bolts to process tuples in a
> time or a count based window. Sliding and tumbling windows can be supported.
> Later this can be extended to trident apis as well.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)