[
https://issues.apache.org/jira/browse/STORM-1167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14991731#comment-14991731
]
ASF GitHub Bot commented on STORM-1167:
---------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/855#discussion_r44016654
--- Diff:
storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.topology.base;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IWindowedBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class BaseWindowedBolt implements IWindowedBolt {
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseWindowedBolt.class);
+
+ private final transient Map<String, Object> windowConfiguration;
+
+ /**
+ * Holds a count value for count based windows and sliding intervals.
+ */
+ public static class Count {
+ public final int value;
+
+ public Count(int value) {
+ this.value = value;
+ }
+ }
+
+ /**
+ * Holds a Time duration for time based windows and sliding intervals.
+ */
+ public static class Duration {
+ public final int value;
+
+ public Duration(int value, TimeUnit timeUnit) {
+ this.value = (int) timeUnit.toMillis(value);
+ }
+ }
+
+ protected BaseWindowedBolt() {
+ windowConfiguration = new HashMap<>();
+ }
+
+ private BaseWindowedBolt withWindowLength(Count count) {
+ windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT,
count.value);
+ return this;
+ }
+
+ private BaseWindowedBolt withWindowLength(Duration duration) {
+
windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS,
duration.value);
+ return this;
+ }
+
+ private BaseWindowedBolt withSlidingInterval(Count count) {
+
windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT,
count.value);
+ return this;
+ }
+
+ private BaseWindowedBolt withSlidingInterval(Duration duration) {
+
windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS,
duration.value);
+ return this;
+ }
+
+ /**
+ * Tuple count based sliding window configuration.
+ *
+ * @param windowLength the number of tuples in the window
+ * @param slidingInterval the number of tuples after which the window
slides
+ */
+ public BaseWindowedBolt withWindow(Count windowLength, Count
slidingInterval) {
+ return
withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+ }
+
+ /**
+ * Tuple count and time duration based sliding window configuration.
+ *
+ * @param windowLength the number of tuples in the window
+ * @param slidingInterval the time duration after which the window
slides
+ */
+ public BaseWindowedBolt withWindow(Count windowLength, Duration
slidingInterval) {
+ return
withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+ }
+
+ /**
+ * Time duration and count based sliding window configuration.
+ *
+ * @param windowLength the time duration of the window
+ * @param slidingInterval the number of tuples after which the window
slides
+ */
+ public BaseWindowedBolt withWindow(Duration windowLength, Count
slidingInterval) {
+ return
withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+ }
+
+ /**
+ * Time duration based sliding window configuration.
+ *
+ * @param windowLength the time duration of the window
+ * @param slidingInterval the time duration after which the window
slides
+ */
+ public BaseWindowedBolt withWindow(Duration windowLength, Duration
slidingInterval) {
+ return
withWindowLength(windowLength).withSlidingInterval(slidingInterval);
+ }
+
+ /**
+ * A tuple count based window that slides with every incoming tuple.
+ *
+ * @param windowLength the number of tuples in the window
+ */
+ public BaseWindowedBolt withWindow(Count windowLength) {
+ return withWindowLength(windowLength).withSlidingInterval(new
Count(1));
+ }
+
+
--- End diff --
Extra line
> Add sliding & tumbling window support for core storm
> ----------------------------------------------------
>
> Key: STORM-1167
> URL: https://issues.apache.org/jira/browse/STORM-1167
> Project: Apache Storm
> Issue Type: Improvement
> Reporter: Arun Mahadevan
> Assignee: Arun Mahadevan
>
> Currently, topologies that needs windowing support requires writing custom
> logic inside bolts making it tedious to handle the windowing and acking logic
> with custom logic.
> We can add framework level support to core storm bolts to process tuples in a
> time or a count based window. Sliding and tumbling windows can be supported.
> Later this can be extended to trident apis as well.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)