Repository: flink
Updated Branches:
  refs/heads/release-0.8 2467f36c8 -> 242ff64e0


[streaming] Time trigger preNotify fix


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b2271bd9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b2271bd9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b2271bd9

Branch: refs/heads/release-0.8
Commit: b2271bd9eb3adc1770f40d452ed7fb69614ea649
Parents: 2467f36
Author: Gyula Fora <[email protected]>
Authored: Fri Jan 2 18:33:46 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Sun Jan 4 22:11:13 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/windowing/policy/TimeTriggerPolicy.java    | 6 +++---
 .../api/windowing/policy/TimeTriggerPolicyTest.java          | 8 +++-----
 2 files changed, 6 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b2271bd9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index 3539ad6..57bccf2 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -107,9 +107,9 @@ public class TimeTriggerPolicy<DATA> implements 
ActiveTriggerPolicy<DATA>,
                LinkedList<Object> fakeElements = new LinkedList<Object>();
                // check if there is more then one window border missed
                // use > here. In case >= would fit, the regular call will do 
the job.
-               while (timestamp.getTimestamp(datapoint) > startTime + 
granularity) {
+               while (timestamp.getTimestamp(datapoint) >= startTime + 
granularity) {
                        startTime += granularity;
-                       fakeElements.add(startTime-1);
+                       fakeElements.add(startTime - 1);
                }
                return (Object[]) fakeElements.toArray();
        }
@@ -146,7 +146,7 @@ public class TimeTriggerPolicy<DATA> implements 
ActiveTriggerPolicy<DATA>,
                // start time is excluded, but end time is included: >=
                if (System.currentTimeMillis() >= startTime + granularity) {
                        startTime += granularity;
-                       callback.sendFakeElement(startTime-1);
+                       callback.sendFakeElement(startTime - 1);
                }
 
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b2271bd9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
index 28d35ec..9c77a55 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
@@ -50,8 +50,7 @@ public class TimeTriggerPolicyTest {
                // test different granularity
                for (long granularity = 0; granularity < 31; granularity++) {
                        // create policy
-                       TriggerPolicy<Integer> policy = new 
TimeTriggerPolicy<Integer>(granularity,
-                                       timeStamp);
+                       TriggerPolicy<Integer> policy = new 
TimeTriggerPolicy<Integer>(granularity, timeStamp);
 
                        // remember window border
                        // Remark: This might NOT work in case the timeStamp 
uses
@@ -101,11 +100,10 @@ public class TimeTriggerPolicyTest {
                };
 
                // create policy
-               TimeTriggerPolicy<Integer> policy = new 
TimeTriggerPolicy<Integer>(5,
-                               timeStamp);
+               TimeTriggerPolicy<Integer> policy = new 
TimeTriggerPolicy<Integer>(5, timeStamp);
 
                // expected result
-               Long[][] result = { {}, {}, { 4L, 9L, 14L }, { 24L } };
+               Long[][] result = { {}, {}, { 4L, 9L, 14L, 19L }, { 24L } };
 
                // call policy
                for (int i = 0; i < times.length; i++) {

Reply via email to