Repository: flink
Updated Branches:
  refs/heads/master 501a9b085 -> dd51c9774


Add Window parameter to KeyedWindowFunction, move to package windowing


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6610caec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6610caec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6610caec

Branch: refs/heads/master
Commit: 6610caeca4093fced35d478ffc596f935912b4c1
Parents: bfaad37
Author: Aljoscha Krettek <[email protected]>
Authored: Thu Sep 24 17:39:19 2015 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Sep 28 17:04:16 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/KeyedWindowDataStream.java   |  5 ++-
 .../windowing/KeyedWindowFunction.java          | 45 ++++++++++++++++++++
 .../functions/windows/KeyedWindowFunction.java  | 44 -------------------
 ...ractAlignedProcessingTimeWindowOperator.java |  3 +-
 .../windowing/AbstractKeyedTimePanes.java       |  3 +-
 .../windowing/AccumulatingKeyedTimePanes.java   | 23 ++++++----
 ...ccumulatingProcessingTimeWindowOperator.java |  7 +--
 .../windowing/AggregatingKeyedTimePanes.java    |  3 +-
 .../operators/windowing/PolicyToOperator.java   |  5 ++-
 ...AlignedProcessingTimeWindowOperatorTest.java | 17 ++++----
 .../flink/streaming/util/TestHarnessUtil.java   | 17 ++++++++
 .../GroupedProcessingTimeWindowExample.java     |  7 +--
 12 files changed, 105 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index 711a959..e658bdd 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -25,9 +25,10 @@ import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator;
 
 /**
@@ -107,7 +108,7 @@ public class KeyedWindowDataStream<T, K> {
         * @param function The window function.
         * @return The data stream that is the result of applying the window 
function to the window.
         */
-       public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<T, 
Result, K> function) {
+       public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<T, 
Result, K, Window> function) {
                String callLocation = Utils.getCallLocationName();
 
                TypeInformation<T> inType = input.getType();

http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java
new file mode 100644
index 0000000..77ce53e
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for functions that are evaluated over keyed (grouped) 
windows.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ */
+public interface KeyedWindowFunction<IN, OUT, KEY, W extends Window> extends 
Function, Serializable {
+
+       /**
+        * 
+        * @param key
+        * @param values
+        * @param out
+        * 
+        * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery. 
+        */
+       void evaluate(KEY key, W window, Iterable<IN> values, Collector<OUT> 
out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
deleted file mode 100644
index b4e55e4..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windows;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * Base interface for functions that are evaluated over keyed (grouped) 
windows.
- *
- * @param <IN> The type of the input value.
- * @param <OUT> The type of the output value.
- * @param <KEY> The type of the key.
- */
-public interface KeyedWindowFunction<IN, OUT, KEY> extends Function, 
Serializable {
-
-       /**
-        * 
-        * @param key
-        * @param values
-        * @param out
-        * 
-        * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery. 
-        */
-       void evaluate(KEY key, Iterable<IN> values, Collector<OUT> out) throws 
Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 6c4e53a..a81340f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -181,7 +182,7 @@ public abstract class 
AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
        private void computeWindow(long timestamp) throws Exception {
                out.setTimestamp(timestamp);
                panes.truncatePanes(numPanesPerWindow);
-               panes.evaluateWindow(out);
+               panes.evaluateWindow(out, new TimeWindow(timestamp, 
windowSize));
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
index fae024b..07dea06 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 
 import java.util.ArrayDeque;
@@ -33,7 +34,7 @@ public abstract class AbstractKeyedTimePanes<Type, Key, 
Aggregate, Result> {
 
        public abstract void addElementToLatestPane(Type element) throws 
Exception;
 
-       public abstract void evaluateWindow(Collector<Result> out) throws 
Exception;
+       public abstract void evaluateWindow(Collector<Result> out, TimeWindow 
window) throws Exception;
        
        
        public void dispose() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index d85c53e..496da6b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -20,7 +20,9 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.util.UnionIterator;
-import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
 import java.util.ArrayList;
@@ -32,13 +34,13 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
 
        private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = 
getListFactory();
 
-       private final KeyedWindowFunction<Type, Result, Key> function;
+       private final KeyedWindowFunction<Type, Result, Key, Window> function;
        
        private long evaluationPass;
 
        // 
------------------------------------------------------------------------
        
-       public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
KeyedWindowFunction<Type, Result, Key> function) {
+       public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
KeyedWindowFunction<Type, Result, Key, Window> function) {
                this.keySelector = keySelector;
                this.function = function;
        }
@@ -53,16 +55,16 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
        }
 
        @Override
-       public void evaluateWindow(Collector<Result> out) throws Exception {
+       public void evaluateWindow(Collector<Result> out, TimeWindow window) 
throws Exception {
                if (previousPanes.isEmpty()) {
                        // optimized path for single pane case (tumbling window)
                        for (KeyMap.Entry<Key, ArrayList<Type>> entry : 
latestPane) {
-                               function.evaluate(entry.getKey(), 
entry.getValue(), out);
+                               function.evaluate(entry.getKey(), window, 
entry.getValue(), out);
                        }
                }
                else {
                        // general code path for multi-pane case
-                       WindowFunctionTraversal<Key, Type, Result> evaluator = 
new WindowFunctionTraversal<>(function, out);
+                       WindowFunctionTraversal<Key, Type, Result> evaluator = 
new WindowFunctionTraversal<>(function, window, out);
                        traverseAllPanes(evaluator, evaluationPass);
                }
                
@@ -75,7 +77,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
        
        static final class WindowFunctionTraversal<Key, Type, Result> 
implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
 
-               private final KeyedWindowFunction<Type, Result, Key> function;
+               private final KeyedWindowFunction<Type, Result, Key, Window> 
function;
                
                private final UnionIterator<Type> unionIterator;
                
@@ -83,10 +85,13 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
                
                private Key currentKey;
 
-               WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key> 
function, Collector<Result> out) {
+               private TimeWindow window;
+
+               WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key, 
Window> function, TimeWindow window, Collector<Result> out) {
                        this.function = function;
                        this.out = out;
                        this.unionIterator = new UnionIterator<>();
+                       this.window = window;
                }
 
 
@@ -103,7 +108,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
 
                @Override
                public void keyDone() throws Exception {
-                       function.evaluate(currentKey, unionIterator, out);
+                       function.evaluate(currentKey, window, unionIterator, 
out);
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 4df308d..8edb76f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -20,7 +20,8 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 
 
 public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
@@ -30,7 +31,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, 
IN, OUT>
 
        
        public AccumulatingProcessingTimeWindowOperator(
-                       KeyedWindowFunction<IN, OUT, KEY> function,
+                       KeyedWindowFunction<IN, OUT, KEY, Window> function,
                        KeySelector<IN, KEY> keySelector,
                        long windowLength,
                        long windowSlide)
@@ -41,7 +42,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, 
IN, OUT>
        @Override
        protected AccumulatingKeyedTimePanes<IN, KEY, OUT> 
createPanes(KeySelector<IN, KEY> keySelector, Function function) {
                @SuppressWarnings("unchecked")
-               KeyedWindowFunction<IN, OUT, KEY> windowFunction = 
(KeyedWindowFunction<IN, OUT, KEY>) function;
+               KeyedWindowFunction<IN, OUT, KEY, Window> windowFunction = 
(KeyedWindowFunction<IN, OUT, KEY, Window>) function;
                
                return new AccumulatingKeyedTimePanes<>(keySelector, 
windowFunction);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
index 48f4eb1..c17f0b4 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 
 
@@ -47,7 +48,7 @@ public class AggregatingKeyedTimePanes<Type, Key> extends 
AbstractKeyedTimePanes
        }
 
        @Override
-       public void evaluateWindow(Collector<Type> out) throws Exception {
+       public void evaluateWindow(Collector<Type> out, TimeWindow window) 
throws Exception {
                if (previousPanes.isEmpty()) {
                        // optimized path for single pane case
                        for (KeyMap.Entry<Key, Type> entry : latestPane) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
index b34d0bc..b1ff7e2 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
@@ -21,11 +21,12 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime;
 import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
 import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 
 /**
  * This class implements the conversion from window policies to concrete 
operator
@@ -60,7 +61,7 @@ public class PolicyToOperator {
                        }
                        else if (function instanceof KeyedWindowFunction) {
                                @SuppressWarnings("unchecked")
-                               KeyedWindowFunction<IN, OUT, KEY> wf = 
(KeyedWindowFunction<IN, OUT, KEY>) function;
+                               KeyedWindowFunction<IN, OUT, KEY, Window> wf = 
(KeyedWindowFunction<IN, OUT, KEY, Window>) function;
 
                                return new 
AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>(
                                                                wf, 
keySelector, windowLength, windowSlide);

http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index bcf02c5..4327e11 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -20,8 +20,9 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -48,7 +49,7 @@ import static org.junit.Assert.*;
 public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        @SuppressWarnings("unchecked")
-       private final KeyedWindowFunction<String, String, String> mockFunction 
= mock(KeyedWindowFunction.class);
+       private final KeyedWindowFunction<String, String, String, Window> 
mockFunction = mock(KeyedWindowFunction.class);
 
        @SuppressWarnings("unchecked")
        private final KeySelector<String, String> mockKeySelector = 
mock(KeySelector.class);
@@ -60,11 +61,11 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                }
        };
        
-       private final KeyedWindowFunction<Integer, Integer, Integer> 
validatingIdentityFunction = 
-                       new KeyedWindowFunction<Integer, Integer, Integer>()
+       private final KeyedWindowFunction<Integer, Integer, Integer, Window> 
validatingIdentityFunction =
+                       new KeyedWindowFunction<Integer, Integer, Integer, 
Window>()
        {
                @Override
-               public void evaluate(Integer key, Iterable<Integer> values, 
Collector<Integer> out) {
+               public void evaluate(Integer key, Window window, 
Iterable<Integer> values, Collector<Integer> out) {
                        for (Integer val : values) {
                                assertEquals(key, val);
                                out.collect(val);
@@ -471,7 +472,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        final StreamingRuntimeContext mockContext = 
mock(StreamingRuntimeContext.class);
                        when(mockContext.getTaskName()).thenReturn("Test task 
name");
 
-                       KeyedWindowFunction<Integer, Integer, Integer> 
failingFunction = new FailingFunction(100);
+                       KeyedWindowFunction<Integer, Integer, Integer, Window> 
failingFunction = new FailingFunction(100);
 
                        // the operator has a window time that is so long that 
it will not fire in this test
                        final long hundredYears = 100L * 365 * 24 * 60 * 60 * 
1000;
@@ -522,7 +523,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        // 
------------------------------------------------------------------------
        
-       private static class FailingFunction implements 
KeyedWindowFunction<Integer, Integer, Integer> {
+       private static class FailingFunction implements 
KeyedWindowFunction<Integer, Integer, Integer, Window> {
 
                private final int failAfterElements;
                
@@ -533,7 +534,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                }
 
                @Override
-               public void evaluate(Integer integer, Iterable<Integer> values, 
Collector<Integer> out) throws Exception {
+               public void evaluate(Integer integer, Window window, 
Iterable<Integer> values, Collector<Integer> out) throws Exception {
                        for (Integer i : values) {
                                out.collect(i);
                                numElements++;

http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index 0732b64..0c5cd8f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.util;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Assert;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
@@ -65,4 +68,18 @@ public class TestHarnessUtil {
                                actual.toArray());
 
        }
+
+       /**
+        * Compare the two queues containing operator/task output by converting 
them to an array first.
+        */
+       public static void assertOutputEqualsSorted(String message, 
Queue<Object> expected, Queue<Object> actual, Comparator<Object> comparator) {
+               Object[] sortedExpected = expected.toArray();
+               Object[] sortedActual = actual.toArray();
+
+               Arrays.sort(sortedExpected, comparator);
+               Arrays.sort(sortedActual, comparator);
+
+               Assert.assertArrayEquals(message, sortedExpected, sortedActual);
+
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index e52c2cb..6cc206a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -26,8 +26,9 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
 import org.apache.flink.streaming.api.windowing.windowpolicy.Time;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -103,10 +104,10 @@ public class GroupedProcessingTimeWindowExample {
                }
        }
 
-       public static class SummingWindowFunction implements 
KeyedWindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> {
+       public static class SummingWindowFunction implements 
KeyedWindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
 
                @Override
-               public void evaluate(Long key, Iterable<Tuple2<Long, Long>> 
values, Collector<Tuple2<Long, Long>> out) {
+               public void evaluate(Long key, Window window, 
Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
                        long sum = 0L;
                        for (Tuple2<Long, Long> value : values) {
                                sum += value.f1;

Reply via email to