Repository: incubator-flink Updated Branches: refs/heads/master 561eaf047 -> f71b0c42f
[streaming] Time trigger preNotify fix Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/87d699d7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/87d699d7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/87d699d7 Branch: refs/heads/master Commit: 87d699d7025f6ceadeb64fe972b7185079b6ec22 Parents: 561eaf0 Author: Gyula Fora <[email protected]> Authored: Fri Jan 2 18:33:46 2015 +0100 Committer: Gyula Fora <[email protected]> Committed: Fri Jan 2 18:33:46 2015 +0100 ---------------------------------------------------------------------- .../streaming/api/windowing/policy/TimeTriggerPolicy.java | 6 +++--- .../api/windowing/policy/TimeTriggerPolicyTest.java | 8 +++----- 2 files changed, 6 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/87d699d7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java index 3539ad6..57bccf2 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java @@ -107,9 +107,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>, LinkedList<Object> fakeElements = new LinkedList<Object>(); // check if there is more then one window border missed // use > here. In case >= would fit, the regular call will do the job. - while (timestamp.getTimestamp(datapoint) > startTime + granularity) { + while (timestamp.getTimestamp(datapoint) >= startTime + granularity) { startTime += granularity; - fakeElements.add(startTime-1); + fakeElements.add(startTime - 1); } return (Object[]) fakeElements.toArray(); } @@ -146,7 +146,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>, // start time is excluded, but end time is included: >= if (System.currentTimeMillis() >= startTime + granularity) { startTime += granularity; - callback.sendFakeElement(startTime-1); + callback.sendFakeElement(startTime - 1); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/87d699d7/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java index 28d35ec..9c77a55 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java @@ -50,8 +50,7 @@ public class TimeTriggerPolicyTest { // test different granularity for (long granularity = 0; granularity < 31; granularity++) { // create policy - TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity, - timeStamp); + TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity, timeStamp); // remember window border // Remark: This might NOT work in case the timeStamp uses @@ -101,11 +100,10 @@ public class TimeTriggerPolicyTest { }; // create policy - TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5, - timeStamp); + TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5, timeStamp); // expected result - Long[][] result = { {}, {}, { 4L, 9L, 14L }, { 24L } }; + Long[][] result = { {}, {}, { 4L, 9L, 14L, 19L }, { 24L } }; // call policy for (int i = 0; i < times.length; i++) {
