Repository: flink Updated Branches: refs/heads/master 501a9b085 -> dd51c9774
Add Window parameter to KeyedWindowFunction, move to package windowing Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6610caec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6610caec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6610caec Branch: refs/heads/master Commit: 6610caeca4093fced35d478ffc596f935912b4c1 Parents: bfaad37 Author: Aljoscha Krettek <[email protected]> Authored: Thu Sep 24 17:39:19 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Sep 28 17:04:16 2015 +0200 ---------------------------------------------------------------------- .../api/datastream/KeyedWindowDataStream.java | 5 ++- .../windowing/KeyedWindowFunction.java | 45 ++++++++++++++++++++ .../functions/windows/KeyedWindowFunction.java | 44 ------------------- ...ractAlignedProcessingTimeWindowOperator.java | 3 +- .../windowing/AbstractKeyedTimePanes.java | 3 +- .../windowing/AccumulatingKeyedTimePanes.java | 23 ++++++---- ...ccumulatingProcessingTimeWindowOperator.java | 7 +-- .../windowing/AggregatingKeyedTimePanes.java | 3 +- .../operators/windowing/PolicyToOperator.java | 5 ++- ...AlignedProcessingTimeWindowOperatorTest.java | 17 ++++---- .../flink/streaming/util/TestHarnessUtil.java | 17 ++++++++ .../GroupedProcessingTimeWindowExample.java | 7 +-- 12 files changed, 105 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java index 711a959..e658bdd 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java @@ -25,9 +25,10 @@ import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy; +import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator; /** @@ -107,7 +108,7 @@ public class KeyedWindowDataStream<T, K> { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<T, Result, K> function) { + public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<T, Result, K, Window> function) { String callLocation = Utils.getCallLocationName(); TypeInformation<T> inType = input.getType(); http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java new file mode 100644 index 0000000..77ce53e --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +/** + * Base interface for functions that are evaluated over keyed (grouped) windows. + * + * @param <IN> The type of the input value. + * @param <OUT> The type of the output value. + * @param <KEY> The type of the key. + */ +public interface KeyedWindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable { + + /** + * + * @param key + * @param values + * @param out + * + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + void evaluate(KEY key, W window, Iterable<IN> values, Collector<OUT> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java deleted file mode 100644 index b4e55e4..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.functions.windows; - -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.util.Collector; - -import java.io.Serializable; - -/** - * Base interface for functions that are evaluated over keyed (grouped) windows. - * - * @param <IN> The type of the input value. - * @param <OUT> The type of the output value. - * @param <KEY> The type of the key. - */ -public interface KeyedWindowFunction<IN, OUT, KEY> extends Function, Serializable { - - /** - * - * @param key - * @param values - * @param out - * - * @throws Exception The function may throw exceptions to fail the program and trigger recovery. - */ - void evaluate(KEY key, Iterable<IN> values, Collector<OUT> out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java index 6c4e53a..a81340f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -181,7 +182,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> private void computeWindow(long timestamp) throws Exception { out.setTimestamp(timestamp); panes.truncatePanes(numPanesPerWindow); - panes.evaluateWindow(out); + panes.evaluateWindow(out, new TimeWindow(timestamp, windowSize)); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java index fae024b..07dea06 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.operators.windowing; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.ArrayDeque; @@ -33,7 +34,7 @@ public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> { public abstract void addElementToLatestPane(Type element) throws Exception; - public abstract void evaluateWindow(Collector<Result> out) throws Exception; + public abstract void evaluateWindow(Collector<Result> out, TimeWindow window) throws Exception; public void dispose() { http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java index d85c53e..496da6b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java @@ -20,7 +20,9 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.util.UnionIterator; -import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; import java.util.ArrayList; @@ -32,13 +34,13 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory(); - private final KeyedWindowFunction<Type, Result, Key> function; + private final KeyedWindowFunction<Type, Result, Key, Window> function; private long evaluationPass; // ------------------------------------------------------------------------ - public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Type, Result, Key> function) { + public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Type, Result, Key, Window> function) { this.keySelector = keySelector; this.function = function; } @@ -53,16 +55,16 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed } @Override - public void evaluateWindow(Collector<Result> out) throws Exception { + public void evaluateWindow(Collector<Result> out, TimeWindow window) throws Exception { if (previousPanes.isEmpty()) { // optimized path for single pane case (tumbling window) for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) { - function.evaluate(entry.getKey(), entry.getValue(), out); + function.evaluate(entry.getKey(), window, entry.getValue(), out); } } else { // general code path for multi-pane case - WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(function, out); + WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(function, window, out); traverseAllPanes(evaluator, evaluationPass); } @@ -75,7 +77,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> { - private final KeyedWindowFunction<Type, Result, Key> function; + private final KeyedWindowFunction<Type, Result, Key, Window> function; private final UnionIterator<Type> unionIterator; @@ -83,10 +85,13 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed private Key currentKey; - WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key> function, Collector<Result> out) { + private TimeWindow window; + + WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key, Window> function, TimeWindow window, Collector<Result> out) { this.function = function; this.out = out; this.unionIterator = new UnionIterator<>(); + this.window = window; } @@ -103,7 +108,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed @Override public void keyDone() throws Exception { - function.evaluate(currentKey, unionIterator, out); + function.evaluate(currentKey, window, unionIterator, out); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java index 4df308d..8edb76f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java @@ -20,7 +20,8 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> @@ -30,7 +31,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> public AccumulatingProcessingTimeWindowOperator( - KeyedWindowFunction<IN, OUT, KEY> function, + KeyedWindowFunction<IN, OUT, KEY, Window> function, KeySelector<IN, KEY> keySelector, long windowLength, long windowSlide) @@ -41,7 +42,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> @Override protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) { @SuppressWarnings("unchecked") - KeyedWindowFunction<IN, OUT, KEY> windowFunction = (KeyedWindowFunction<IN, OUT, KEY>) function; + KeyedWindowFunction<IN, OUT, KEY, Window> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, Window>) function; return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction); } http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java index 48f4eb1..c17f0b4 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; @@ -47,7 +48,7 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes } @Override - public void evaluateWindow(Collector<Type> out) throws Exception { + public void evaluateWindow(Collector<Type> out, TimeWindow window) throws Exception { if (previousPanes.isEmpty()) { // optimized path for single pane case for (KeyMap.Entry<Key, Type> entry : latestPane) { http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java index b34d0bc..b1ff7e2 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java @@ -21,11 +21,12 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime; import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime; import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy; +import org.apache.flink.streaming.api.windowing.windows.Window; /** * This class implements the conversion from window policies to concrete operator @@ -60,7 +61,7 @@ public class PolicyToOperator { } else if (function instanceof KeyedWindowFunction) { @SuppressWarnings("unchecked") - KeyedWindowFunction<IN, OUT, KEY> wf = (KeyedWindowFunction<IN, OUT, KEY>) function; + KeyedWindowFunction<IN, OUT, KEY, Window> wf = (KeyedWindowFunction<IN, OUT, KEY, Window>) function; return new AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>( wf, keySelector, windowLength, windowSlide); http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index bcf02c5..4327e11 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -20,8 +20,9 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -48,7 +49,7 @@ import static org.junit.Assert.*; public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @SuppressWarnings("unchecked") - private final KeyedWindowFunction<String, String, String> mockFunction = mock(KeyedWindowFunction.class); + private final KeyedWindowFunction<String, String, String, Window> mockFunction = mock(KeyedWindowFunction.class); @SuppressWarnings("unchecked") private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class); @@ -60,11 +61,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } }; - private final KeyedWindowFunction<Integer, Integer, Integer> validatingIdentityFunction = - new KeyedWindowFunction<Integer, Integer, Integer>() + private final KeyedWindowFunction<Integer, Integer, Integer, Window> validatingIdentityFunction = + new KeyedWindowFunction<Integer, Integer, Integer, Window>() { @Override - public void evaluate(Integer key, Iterable<Integer> values, Collector<Integer> out) { + public void evaluate(Integer key, Window window, Iterable<Integer> values, Collector<Integer> out) { for (Integer val : values) { assertEquals(key, val); out.collect(val); @@ -471,7 +472,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); when(mockContext.getTaskName()).thenReturn("Test task name"); - KeyedWindowFunction<Integer, Integer, Integer> failingFunction = new FailingFunction(100); + KeyedWindowFunction<Integer, Integer, Integer, Window> failingFunction = new FailingFunction(100); // the operator has a window time that is so long that it will not fire in this test final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000; @@ -522,7 +523,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { // ------------------------------------------------------------------------ - private static class FailingFunction implements KeyedWindowFunction<Integer, Integer, Integer> { + private static class FailingFunction implements KeyedWindowFunction<Integer, Integer, Integer, Window> { private final int failAfterElements; @@ -533,7 +534,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } @Override - public void evaluate(Integer integer, Iterable<Integer> values, Collector<Integer> out) throws Exception { + public void evaluate(Integer integer, Window window, Iterable<Integer> values, Collector<Integer> out) throws Exception { for (Integer i : values) { out.collect(i); numElements++; http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java index 0732b64..0c5cd8f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java @@ -20,6 +20,9 @@ package org.apache.flink.streaming.util; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.junit.Assert; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -65,4 +68,18 @@ public class TestHarnessUtil { actual.toArray()); } + + /** + * Compare the two queues containing operator/task output by converting them to an array first. + */ + public static void assertOutputEqualsSorted(String message, Queue<Object> expected, Queue<Object> actual, Comparator<Object> comparator) { + Object[] sortedExpected = expected.toArray(); + Object[] sortedActual = actual.toArray(); + + Arrays.sort(sortedExpected, comparator); + Arrays.sort(sortedActual, comparator); + + Assert.assertArrayEquals(message, sortedExpected, sortedActual); + + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6610caec/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java index e52c2cb..6cc206a 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java @@ -26,8 +26,9 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction; import org.apache.flink.streaming.api.windowing.windowpolicy.Time; +import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -103,10 +104,10 @@ public class GroupedProcessingTimeWindowExample { } } - public static class SummingWindowFunction implements KeyedWindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> { + public static class SummingWindowFunction implements KeyedWindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> { @Override - public void evaluate(Long key, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) { + public void evaluate(Long key, Window window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) { long sum = 0L; for (Tuple2<Long, Long> value : values) { sum += value.f1;
