[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15066033#comment-15066033
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/900#issuecomment-166198098
  
Thanks @arunmahadevan merged into master.


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15066032#comment-15066032
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/900


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15060242#comment-15060242
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/900#issuecomment-165164884
  
+1


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15058315#comment-15058315
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/900#discussion_r47664435
  
--- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
@@ -1697,19 +1697,45 @@
 public static final String TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS = 
"topology.bolts.window.length.duration.ms";
 
 /*
- * Bolt-specific configuration for windowed bolts to specifiy the 
sliding interval as a count of number of tuples.
+ * Bolt-specific configuration for windowed bolts to specify the 
sliding interval as a count of number of tuples.
  */
 @isInteger
 @isPositiveNumber
 public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT = 
"topology.bolts.window.sliding.interval.count";
 
 /*
- * Bolt-specific configuration for windowed bolts to specifiy the 
sliding interval in time duration.
+ * Bolt-specific configuration for windowed bolts to specify the 
sliding interval in time duration.
  */
 @isInteger
 @isPositiveNumber
 public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS 
= "topology.bolts.window.sliding.interval.duration.ms";
 
+/*
+ * Bolt-specific configuration for windowed bolts to specify the name 
of the field in the tuple that holds
+ * the timestamp (e.g. the ts when the tuple was actually generated). 
If this config is specified and the
+ * field is not present in the incoming tuple, a 
java.lang.IllegalArgumentException will be thrown.
+ */
+@isString
+public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME = 
"topology.bolts.tuple.timestamp.field.name";
--- End diff --

@harshach BOLT was added to emphasize that this is a bolt specific 
configuration (used for windowed bolts) similar to the other params above this.


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15058307#comment-15058307
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/900#discussion_r47663680
  
--- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
@@ -1697,19 +1697,45 @@
 public static final String TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS = 
"topology.bolts.window.length.duration.ms";
 
 /*
- * Bolt-specific configuration for windowed bolts to specifiy the 
sliding interval as a count of number of tuples.
+ * Bolt-specific configuration for windowed bolts to specify the 
sliding interval as a count of number of tuples.
  */
 @isInteger
 @isPositiveNumber
 public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT = 
"topology.bolts.window.sliding.interval.count";
 
 /*
- * Bolt-specific configuration for windowed bolts to specifiy the 
sliding interval in time duration.
+ * Bolt-specific configuration for windowed bolts to specify the 
sliding interval in time duration.
  */
 @isInteger
 @isPositiveNumber
 public static final String TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS 
= "topology.bolts.window.sliding.interval.duration.ms";
 
+/*
+ * Bolt-specific configuration for windowed bolts to specify the name 
of the field in the tuple that holds
+ * the timestamp (e.g. the ts when the tuple was actually generated). 
If this config is specified and the
+ * field is not present in the incoming tuple, a 
java.lang.IllegalArgumentException will be thrown.
+ */
+@isString
+public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME = 
"topology.bolts.tuple.timestamp.field.name";
--- End diff --

any reason for this config to have BOLT in them , can't we name it as 
topology.tuple.timestamp.field.name


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15057637#comment-15057637
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/900#discussion_r47610293
  
--- Diff: 
storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java ---
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.topology.FailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tracks tuples across input streams and periodically emits watermark 
events.
+ * Watermark event timestamp is the minimum of the latest tuple timestamps
+ * across all the input streams (minus the lag). Once a watermark event is 
emitted
+ * any tuple coming with an earlier timestamp can be considered as late 
events.
+ */
+public class WaterMarkEventGenerator implements Runnable {
+private static final Logger LOG = 
LoggerFactory.getLogger(WaterMarkEventGenerator.class);
+private final WindowManager windowManager;
+private final int eventTsLag;
+private final Set inputStreams;
+private final Map streamToTs;
+private final ScheduledExecutorService executorService;
+private final ScheduledFuture executorFuture;
+private long lastWaterMarkTs = 0;
+
+public WaterMarkEventGenerator(WindowManager windowManager, int 
interval,
+   int eventTsLag, Set 
inputStreams) {
+this.windowManager = windowManager;
+streamToTs = new ConcurrentHashMap<>();
+executorService = Executors.newSingleThreadScheduledExecutor();
+this.executorFuture = executorService.scheduleAtFixedRate(this, 
interval, interval, TimeUnit.MILLISECONDS);
+this.eventTsLag = eventTsLag;
+this.inputStreams = inputStreams;
+}
+
+public void track(GlobalStreamId stream, long ts) {
+Long currentVal = streamToTs.get(stream);
+if (currentVal == null || ts > currentVal) {
+streamToTs.put(stream, ts);
+}
+checkFailures();
+}
+
+@Override
+public void run() {
+try {
+long waterMarkTs = computeWaterMarkTs();
+if (waterMarkTs > lastWaterMarkTs) {
+this.windowManager.add(new WaterMarkEvent(waterMarkTs - 
eventTsLag));
+lastWaterMarkTs = waterMarkTs;
+}
+} catch (Throwable th) {
+LOG.error("Failed while processing watermark event ", th);
+throw th;
+}
+}
+
+/**
+ * Computes the min ts across all streams.
+ */
+private long computeWaterMarkTs() {
+long ts = Long.MIN_VALUE;
+// only if some data has arrived on each input stream
--- End diff --

The minimum of the latest event timestamps across all input streams (minus 
the lag) is considered as the watermark timestamp. This is so that if events 
from one of the streams is delayed more, we don't treat all events from that 
stream as late events.


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  

[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15057318#comment-15057318
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user satishd commented on the pull request:

https://github.com/apache/storm/pull/900#issuecomment-164641923
  
Overall LGTM, +1 


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15057317#comment-15057317
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/900#discussion_r47595521
  
--- Diff: storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java 
---
@@ -39,4 +63,12 @@
  * @param event the input event to be tracked
  */
 void track(Event event);
+
+/**
+ * Sets a context in the eviction policy that can be used while 
evicting the events.
+ * E.g. For TimeEvictionPolicy, this could be used to set the 
reference timestamp.
+ *
+ * @param context
+ */
+void setContext(Object context);
--- End diff --

May want to introduce class/interface EvictionPolicyContext which may have 
opaque structure for now. This can be done later as this PR is waiting for long.



> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15057314#comment-15057314
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/900#discussion_r47595114
  
--- Diff: 
storm-core/test/jvm/backtype/storm/windowing/WaterMarkEventGeneratorTest.java 
---
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import backtype.storm.generated.GlobalStreamId;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit tests for {@link WaterMarkEventGeneratorTest}
--- End diff --

Minor typo: Unit tests for {@link WaterMarkEventGenerator} instead of Unit 
tests for {@link WaterMarkEventGeneratorTest}


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15056749#comment-15056749
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user Parth-Brahmbhatt commented on the pull request:

https://github.com/apache/storm/pull/900#issuecomment-164565516
  
Overall I am +1. 


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15056738#comment-15056738
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/900#discussion_r47559536
  
--- Diff: 
storm-core/src/jvm/backtype/storm/windowing/WaterMarkEventGenerator.java ---
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.windowing;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.topology.FailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tracks tuples across input streams and periodically emits watermark 
events.
+ * Watermark event timestamp is the minimum of the latest tuple timestamps
+ * across all the input streams (minus the lag). Once a watermark event is 
emitted
+ * any tuple coming with an earlier timestamp can be considered as late 
events.
+ */
+public class WaterMarkEventGenerator implements Runnable {
+private static final Logger LOG = 
LoggerFactory.getLogger(WaterMarkEventGenerator.class);
+private final WindowManager windowManager;
+private final int eventTsLag;
+private final Set inputStreams;
+private final Map streamToTs;
+private final ScheduledExecutorService executorService;
+private final ScheduledFuture executorFuture;
+private long lastWaterMarkTs = 0;
+
+public WaterMarkEventGenerator(WindowManager windowManager, int 
interval,
+   int eventTsLag, Set 
inputStreams) {
+this.windowManager = windowManager;
+streamToTs = new ConcurrentHashMap<>();
+executorService = Executors.newSingleThreadScheduledExecutor();
+this.executorFuture = executorService.scheduleAtFixedRate(this, 
interval, interval, TimeUnit.MILLISECONDS);
+this.eventTsLag = eventTsLag;
+this.inputStreams = inputStreams;
+}
+
+public void track(GlobalStreamId stream, long ts) {
+Long currentVal = streamToTs.get(stream);
+if (currentVal == null || ts > currentVal) {
+streamToTs.put(stream, ts);
+}
+checkFailures();
+}
+
+@Override
+public void run() {
+try {
+long waterMarkTs = computeWaterMarkTs();
+if (waterMarkTs > lastWaterMarkTs) {
+this.windowManager.add(new WaterMarkEvent(waterMarkTs - 
eventTsLag));
+lastWaterMarkTs = waterMarkTs;
+}
+} catch (Throwable th) {
+LOG.error("Failed while processing watermark event ", th);
+throw th;
+}
+}
+
+/**
+ * Computes the min ts across all streams.
+ */
+private long computeWaterMarkTs() {
+long ts = Long.MIN_VALUE;
+// only if some data has arrived on each input stream
--- End diff --

why do we have to wait for data to arrive on each input stream? 


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> recei

[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15056720#comment-15056720
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user Parth-Brahmbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/900#discussion_r47557478
  
--- Diff: docs/documentation/Windowing.md ---
@@ -126,6 +126,96 @@ Time duration based tumbling window that tumbles after 
the specified time durati
 
 ```
 
+## Tuple timestamp and out of order tuples
+By default the timestamp tracked in the window is the time when the tuple 
is processed by the bolt. The window calculations
+are performed based on the processing timestamp. Storm has support for 
tracking windows based on the source generated timestamp.
+
+```java
+/**
+* Specify a field in the tuple that represents the timestamp as a long 
value. If this
+* field is not present in the incoming tuple, an {@link 
IllegalArgumentException} will be thrown.
+*
+* @param fieldName the name of the field that contains the timestamp
+*/
+public BaseWindowedBolt withTimestampField(String fieldName)
+```
+
+The value for the above `fieldName` will be looked up from the incoming 
tuple and considered for windowing calculations. 
+If the field is not present in the tuple an exception will be thrown. 
Along with the timestamp field name, a time lag parameter 
+can also be specified which indicates the max time limit for tuples with 
out of order timestamps. 
+
+E.g. If the lag is 5 secs and a tuple `t1` arrived with timestamp 
`06:00:05` no tuples may arrive with tuple timestamp earlier than `06:00:00`. 
If a tuple
+arrives with timestamp 05:59:59 after `t1` and the window has moved past 
`t1`, it will be treated as a late tuple and not processed. 
--- End diff --

Lets also document how users can find out number of discarded tuples? In 
many cases it may also be useful to provide a handler for tuples being 
discarded but I am fine with not including that in this patch.


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048422#comment-15048422
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user arunmahadevan commented on the pull request:

https://github.com/apache/storm/pull/900#issuecomment-163171738
  
@HeartSaVioR I've upmerged.


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15047819#comment-15047819
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user HeartSaVioR commented on the pull request:

https://github.com/apache/storm/pull/900#issuecomment-163075626
  
@arunmahadevan I've merge #925, so please upmerge. Maybe you can ignore 
master's changeset and take #900.


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15045589#comment-15045589
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/900#issuecomment-162641124
  
> @d2r It seems to be due to timing issues since the unit tests relied on 
Thread.sleep(). I've removed sleep and updated WaterMarkEventGeneratorTest to 
trigger manually.
> 
> Will also review other unit tests for similar issues.

OK, sounds good to me.  Thanks.


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15045588#comment-15045588
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user arunmahadevan commented on the pull request:

https://github.com/apache/storm/pull/900#issuecomment-162640534
  
@d2r It seems to be due to timing issues since the unit tests relied on 
Thread.sleep(). I've removed sleep and updated `WaterMarkEventGeneratorTest` to 
trigger manually.

Will also review other unit tests for similar issues.


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15045542#comment-15045542
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/900#issuecomment-162632780
  
From the travis-ci run, I see two errors from WaterMarkEventsGeneratorTest:

```
Running backtype.storm.windowing.WaterMarkEventGeneratorTest

Tests run: 4, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 0.651 sec 
<<< FAILURE! - in backtype.storm.windowing.WaterMarkEventGeneratorTest

testTrackTwoStreams(backtype.storm.windowing.WaterMarkEventGeneratorTest)  
Time elapsed: 0.13 sec  <<< ERROR!

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.get(ArrayList.java:411)
at 
backtype.storm.windowing.WaterMarkEventGeneratorTest.testTrackTwoStreams(WaterMarkEventGeneratorTest.java:90)

testTrackSingleStream(backtype.storm.windowing.WaterMarkEventGeneratorTest) 
 Time elapsed: 0.191 sec  <<< ERROR!

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.get(ArrayList.java:411)
at 
backtype.storm.windowing.WaterMarkEventGeneratorTest.testTrackSingleStream(WaterMarkEventGeneratorTest.java:62)
```



> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15043497#comment-15043497
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/900#discussion_r46762681
  
--- Diff: 
storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java ---
@@ -21,21 +21,30 @@
  * Eviction policy that evicts events based on time duration.
  */
 public class TimeEvictionPolicy implements EvictionPolicy {
-private final long duration;
+private final int windowLength;
+protected Long currentTs;
--- End diff --

Its the reference time for the window calculation, based on which events 
are expired from the window. Will rename to `referenceTime`.


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-12-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15041773#comment-15041773
 ] 

ASF GitHub Bot commented on STORM-1187:
---

Github user rfarivar commented on a diff in the pull request:

https://github.com/apache/storm/pull/900#discussion_r46704537
  
--- Diff: 
storm-core/src/jvm/backtype/storm/windowing/TimeEvictionPolicy.java ---
@@ -21,21 +21,30 @@
  * Eviction policy that evicts events based on time duration.
  */
 public class TimeEvictionPolicy implements EvictionPolicy {
-private final long duration;
+private final int windowLength;
+protected Long currentTs;
--- End diff --

What does currentTs represent? May I suggest renaming to something more 
representative?


> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15022052#comment-15022052
 ] 

ASF GitHub Bot commented on STORM-1187:
---

GitHub user arunmahadevan opened a pull request:

https://github.com/apache/storm/pull/900

STORM-1187 Support windowing based on tuple ts

Support for doing window calculations based to tuple timestamps and handle
out of order events based on time lag.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arunmahadevan/storm windowing-eventts

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/900.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #900


commit 3c73fc72a92a0b4f0df805081868c2177768bd25
Author: Arun Mahadevan 
Date:   2015-11-13T12:36:56Z

STORM-1187 Support windowing based on tuple ts

Support for doing window calculations based to tuple timestamps and handle
out of order events based on time lag.




> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-11-18 Thread Arun Mahadevan (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15012801#comment-15012801
 ] 

Arun Mahadevan commented on STORM-1187:
---

The plan is to add support to core first and roll out trident as the next step. 
Since trident handles one batch at a time, need to figure out the approach for 
trident windowing overall.

> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-11-18 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15012663#comment-15012663
 ] 

冯健 commented on STORM-1187:
---

great , that is exactly what i thought it would be.  will trident also support 
this at a high level?

> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-11-18 Thread Arun Mahadevan (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15011489#comment-15011489
 ] 

Arun Mahadevan commented on STORM-1187:
---

At a high level this is captured under "Tuple timestamp and out of order 
tuples" at 

https://github.com/arunmahadevan/storm/blob/cd051103bd05be52d4f621b847da8a200f7e7c79/docs/documentation/Windowing.md#tuple-timestamp-and-out-of-order-tuples

I am mostly through with the changes and can raise PR for review once PR# 855 
gets merged.

> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-11-18 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15011207#comment-15011207
 ] 

Parth Brahmbhatt commented on STORM-1187:
-

[~arunmahadevan] Do you want to post your design doc for review?

> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-11-18 Thread Robert Joseph Evans (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15011136#comment-15011136
 ] 

Robert Joseph Evans commented on STORM-1187:


Yes event time is just another dimension that we can put into buckets and 
aggregate on but the hard part is knowing when you can consider a bucket 
complete and when you consider a bucket completely dead and no more late data 
will be accepted.  A lot of this depends on your use case and where the 
aggregated data is stored.  I really think the cloud dataflow API captures most 
of what an API like this should support.  If we could make it cleaner that 
would be great, but what we are doing is complex enough that I am not sure we 
really can make it that much cleaner.

> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-11-17 Thread Arun Mahadevan (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15010316#comment-15010316
 ] 

Arun Mahadevan commented on STORM-1187:
---

Thanks for the inputs. Yes, the plan to to handle event based ts roughly along 
those lines using the concept of "watermarks" and I am working on the same.

> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows

2015-11-17 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15010117#comment-15010117
 ] 

冯健 commented on STORM-1187:
---

In this paper 
(http://www.cs.cmu.edu/~pavlo/courses/fall2013/static/papers/p734-akidau.pdf) 
from google which describes a framework for building realtime computation 
application,they provide a semantic named "Low Watermarks" which indicates that 
all data up to a given timestamp(generation time) has been received. Using this 
semantic,we can distinguish whether the events are simply delayed or actually 
not there, also, out-of-order events won't be a problem anymore.

> Support for late and out of order events in time based windows
> --
>
> Key: STORM-1187
> URL: https://issues.apache.org/jira/browse/STORM-1187
> Project: Apache Storm
>  Issue Type: Sub-task
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>
> Right now the time based windows uses the timestamp when the tuple is 
> received by the bolt. 
> However there are use cases where the tuples can be processed based on the 
> time when they are actually generated vs the time when they are received. So 
> we need to add support for processing events with a time lag and also have 
> some way to specify and read tuple timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)