http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java deleted file mode 100644 index ec8cda8..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java +++ /dev/null @@ -1,519 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators.windowing; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.WindowMapFunction; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.windowing.StreamWindow; -import org.apache.flink.streaming.api.windowing.helper.Count; -import org.apache.flink.streaming.api.windowing.helper.FullStream; -import org.apache.flink.streaming.api.windowing.helper.Time; -import org.apache.flink.streaming.api.windowing.helper.Timestamp; -import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.util.Collector; -import org.junit.Test; - -public class WindowIntegrationTest implements Serializable { - - private static final long serialVersionUID = 1L; - private static final Integer MEMORYSIZE = 32; - - @SuppressWarnings("serial") - public static class ModKey implements KeySelector<Integer, Integer> { - private int m; - - public ModKey(int m) { - this.m = m; - } - - @Override - public Integer getKey(Integer value) throws Exception { - return value % m; - } - } - - @SuppressWarnings("serial") - public static class IdentityWindowMap implements - WindowMapFunction<Integer, StreamWindow<Integer>> { - - @Override - public void mapWindow(Iterable<Integer> values, Collector<StreamWindow<Integer>> out) - throws Exception { - - StreamWindow<Integer> window = new StreamWindow<Integer>(); - - for (Integer value : values) { - window.add(value); - } - out.collect(window); - } - - } - - @SuppressWarnings("serial") - @Test - public void test() throws Exception { - - List<Integer> inputs = new ArrayList<Integer>(); - inputs.add(1); - inputs.add(2); - inputs.add(2); - inputs.add(3); - inputs.add(4); - inputs.add(5); - inputs.add(10); - inputs.add(11); - inputs.add(11); - - KeySelector<Integer, ?> key = new ModKey(2); - - Timestamp<Integer> ts = new Timestamp<Integer>() { - - private static final long serialVersionUID = 1L; - - @Override - public long getTimestamp(Integer value) { - return value; - } - }; - - StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE); - env.disableOperatorChaining(); - - DataStream<Integer> source = env.fromCollection(inputs); - - source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream() - .addSink(new TestSink1()); - - source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap()) - .flatten().addSink(new TestSink2()); - - source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream() - .addSink(new TestSink4()); - - source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2)) - .mapWindow(new IdentityWindowMap()).flatten().addSink(new TestSink5()); - - source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream() - .addSink(new TestSink3()); - - source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream() - .addSink(new TestSink6()); - - source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten() - .addSink(new TestSink7()); - - source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0) - .getDiscretizedStream().addSink(new TestSink8()); - - try { - source.window(FullStream.window()).every(Count.of(2)).getDiscretizedStream(); - fail(); - } catch (Exception e) { - } - try { - source.window(FullStream.window()).getDiscretizedStream(); - fail(); - } catch (Exception e) { - } - try { - source.every(Count.of(5)).mapWindow(new IdentityWindowMap()).getDiscretizedStream(); - fail(); - } catch (Exception e) { - } - - source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new TestSink11()); - - source.window(FullStream.window()).every(Count.of(4)).groupBy(key).sum(0) - .getDiscretizedStream().addSink(new TestSink12()); - - DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() { - private static final long serialVersionUID = 1L; - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - for (int i = 1; i <= 10; i++) { - ctx.collect(i); - } - } - - @Override - public void cancel() { - } - }); - - DataStream<Integer> source3 = env.addSource(new RichParallelSourceFunction<Integer>() { - private static final long serialVersionUID = 1L; - - private int i = 1; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - i = 1 + getRuntimeContext().getIndexOfThisSubtask(); - } - - @Override - public void cancel() { - } - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - for (;i < 11; i += 2) { - ctx.collect(i); - } - - } - }); - - source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink9()); - - source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream() - .addSink(new TestSink10()); - - source.map(new MapFunction<Integer, Integer>() { - @Override - public Integer map(Integer value) throws Exception { - return value; - } - }).every(Time.of(5, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink13()); - - env.execute(); - - // sum ( Time of 3 slide 2 ) - List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>(); - expected1.add(StreamWindow.fromElements(5)); - expected1.add(StreamWindow.fromElements(11)); - expected1.add(StreamWindow.fromElements(9)); - expected1.add(StreamWindow.fromElements(10)); - expected1.add(StreamWindow.fromElements(32)); - - validateOutput(expected1, TestSink1.windows); - - // Tumbling Time of 4 grouped by mod 2 - List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>(); - expected2.add(StreamWindow.fromElements(2, 2, 4)); - expected2.add(StreamWindow.fromElements(1, 3)); - expected2.add(StreamWindow.fromElements(5)); - expected2.add(StreamWindow.fromElements(10)); - expected2.add(StreamWindow.fromElements(11, 11)); - - validateOutput(expected2, TestSink2.windows); - - // groupby mod 2 sum ( Tumbling Time of 4) - List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>(); - expected3.add(StreamWindow.fromElements(4)); - expected3.add(StreamWindow.fromElements(5)); - expected3.add(StreamWindow.fromElements(22)); - expected3.add(StreamWindow.fromElements(8)); - expected3.add(StreamWindow.fromElements(10)); - - validateOutput(expected3, TestSink4.windows); - - // groupby mod3 Tumbling Count of 2 grouped by mod 2 - List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>(); - expected4.add(StreamWindow.fromElements(2, 2)); - expected4.add(StreamWindow.fromElements(1)); - expected4.add(StreamWindow.fromElements(4)); - expected4.add(StreamWindow.fromElements(5, 11)); - expected4.add(StreamWindow.fromElements(10)); - expected4.add(StreamWindow.fromElements(11)); - expected4.add(StreamWindow.fromElements(3)); - - validateOutput(expected4, TestSink5.windows); - - // min ( Time of 2 slide 3 ) - List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>(); - expected5.add(StreamWindow.fromElements(1)); - expected5.add(StreamWindow.fromElements(4)); - expected5.add(StreamWindow.fromElements(10)); - - validateOutput(expected5, TestSink3.windows); - - // groupby mod 2 max ( Tumbling Time of 4) - List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>(); - expected6.add(StreamWindow.fromElements(3)); - expected6.add(StreamWindow.fromElements(5)); - expected6.add(StreamWindow.fromElements(11)); - expected6.add(StreamWindow.fromElements(4)); - expected6.add(StreamWindow.fromElements(10)); - - validateOutput(expected6, TestSink6.windows); - - List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>(); - expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5)); - expected7.add(StreamWindow.fromElements(10)); - expected7.add(StreamWindow.fromElements(10, 11, 11)); - - validateOutput(expected7, TestSink7.windows); - - List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>(); - expected8.add(StreamWindow.fromElements(4, 8)); - expected8.add(StreamWindow.fromElements(4, 5)); - expected8.add(StreamWindow.fromElements(10, 22)); - - for (List<Integer> sw : TestSink8.windows) { - Collections.sort(sw); - } - - validateOutput(expected8, TestSink8.windows); - - List<StreamWindow<Integer>> expected9 = new ArrayList<StreamWindow<Integer>>(); - expected9.add(StreamWindow.fromElements(6)); - expected9.add(StreamWindow.fromElements(14)); - expected9.add(StreamWindow.fromElements(22)); - expected9.add(StreamWindow.fromElements(30)); - expected9.add(StreamWindow.fromElements(38)); - - validateOutput(expected9, TestSink9.windows); - - List<StreamWindow<Integer>> expected10 = new ArrayList<StreamWindow<Integer>>(); - expected10.add(StreamWindow.fromElements(6, 9)); - expected10.add(StreamWindow.fromElements(16, 24)); - - for (List<Integer> sw : TestSink10.windows) { - Collections.sort(sw); - } - - validateOutput(expected10, TestSink10.windows); - - List<StreamWindow<Integer>> expected11 = new ArrayList<StreamWindow<Integer>>(); - expected11.add(StreamWindow.fromElements(8)); - expected11.add(StreamWindow.fromElements(38)); - expected11.add(StreamWindow.fromElements(49)); - - for (List<Integer> sw : TestSink11.windows) { - Collections.sort(sw); - } - - validateOutput(expected11, TestSink11.windows); - - List<StreamWindow<Integer>> expected12 = new ArrayList<StreamWindow<Integer>>(); - expected12.add(StreamWindow.fromElements(4, 4)); - expected12.add(StreamWindow.fromElements(18, 20)); - expected12.add(StreamWindow.fromElements(18, 31)); - - for (List<Integer> sw : TestSink12.windows) { - Collections.sort(sw); - } - - validateOutput(expected12, TestSink12.windows); - - List<StreamWindow<Integer>> expected13 = new ArrayList<StreamWindow<Integer>>(); - expected13.add(StreamWindow.fromElements(17)); - expected13.add(StreamWindow.fromElements(27)); - expected13.add(StreamWindow.fromElements(49)); - - for (List<Integer> sw : TestSink13.windows) { - Collections.sort(sw); - } - - validateOutput(expected13, TestSink13.windows); - - } - - public static <R> void validateOutput(List<R> expected, List<R> actual) { - assertEquals(new HashSet<R>(expected), new HashSet<R>(actual)); - } - - @SuppressWarnings("serial") - private static class TestSink1 implements SinkFunction<StreamWindow<Integer>> { - - public static List<StreamWindow<Integer>> windows = Collections - .synchronizedList(new ArrayList<StreamWindow<Integer>>()); - - @Override - public void invoke(StreamWindow<Integer> value) throws Exception { - windows.add(value); - } - - } - - @SuppressWarnings("serial") - private static class TestSink2 implements SinkFunction<StreamWindow<Integer>> { - - public static List<StreamWindow<Integer>> windows = Collections - .synchronizedList(new ArrayList<StreamWindow<Integer>>()); - - @Override - public void invoke(StreamWindow<Integer> value) throws Exception { - windows.add(value); - } - - } - - @SuppressWarnings("serial") - private static class TestSink3 implements SinkFunction<StreamWindow<Integer>> { - - public static List<StreamWindow<Integer>> windows = Collections - .synchronizedList(new ArrayList<StreamWindow<Integer>>()); - - @Override - public void invoke(StreamWindow<Integer> value) throws Exception { - windows.add(value); - } - - } - - @SuppressWarnings("serial") - private static class TestSink4 implements SinkFunction<StreamWindow<Integer>> { - - public static List<StreamWindow<Integer>> windows = Collections - .synchronizedList(new ArrayList<StreamWindow<Integer>>()); - - @Override - public void invoke(StreamWindow<Integer> value) throws Exception { - windows.add(value); - } - - } - - @SuppressWarnings("serial") - private static class TestSink5 implements SinkFunction<StreamWindow<Integer>> { - - public static List<StreamWindow<Integer>> windows = Collections - .synchronizedList(new ArrayList<StreamWindow<Integer>>()); - - @Override - public void invoke(StreamWindow<Integer> value) throws Exception { - windows.add(value); - } - - } - - @SuppressWarnings("serial") - private static class TestSink6 implements SinkFunction<StreamWindow<Integer>> { - - public static List<StreamWindow<Integer>> windows = Collections - .synchronizedList(new ArrayList<StreamWindow<Integer>>()); - - @Override - public void invoke(StreamWindow<Integer> value) throws Exception { - windows.add(value); - } - - } - - @SuppressWarnings("serial") - private static class TestSink7 implements SinkFunction<StreamWindow<Integer>> { - - public static List<StreamWindow<Integer>> windows = Collections - .synchronizedList(new ArrayList<StreamWindow<Integer>>()); - - @Override - public void invoke(StreamWindow<Integer> value) throws Exception { - windows.add(value); - } - - } - - @SuppressWarnings("serial") - private static class TestSink8 implements SinkFunction<StreamWindow<Integer>> { - - public static List<StreamWindow<Integer>> windows = Collections - .synchronizedList(new ArrayList<StreamWindow<Integer>>()); - - @Override - public void invoke(StreamWindow<Integer> value) throws Exception { - windows.add(value); - } - - } - - @SuppressWarnings("serial") - private static class TestSink9 implements SinkFunction<StreamWindow<Integer>> { - - public static List<StreamWindow<Integer>> windows = Collections - .synchronizedList(new ArrayList<StreamWindow<Integer>>()); - - @Override - public void invoke(StreamWindow<Integer> value) throws Exception { - windows.add(value); - } - - } - - @SuppressWarnings("serial") - private static class TestSink10 implements SinkFunction<StreamWindow<Integer>> { - - public static List<StreamWindow<Integer>> windows = Collections - .synchronizedList(new ArrayList<StreamWindow<Integer>>()); - - @Override - public void invoke(StreamWindow<Integer> value) throws Exception { - windows.add(value); - } - - } - - @SuppressWarnings("serial") - private static class TestSink11 implements SinkFunction<StreamWindow<Integer>> { - - public static List<StreamWindow<Integer>> windows = Collections - .synchronizedList(new ArrayList<StreamWindow<Integer>>()); - - @Override - public void invoke(StreamWindow<Integer> value) throws Exception { - windows.add(value); - } - - } - - @SuppressWarnings("serial") - private static class TestSink12 implements SinkFunction<StreamWindow<Integer>> { - - public static List<StreamWindow<Integer>> windows = Collections - .synchronizedList(new ArrayList<StreamWindow<Integer>>()); - - @Override - public void invoke(StreamWindow<Integer> value) throws Exception { - windows.add(value); - } - - } - - @SuppressWarnings("serial") - private static class TestSink13 implements SinkFunction<StreamWindow<Integer>> { - - public static List<StreamWindow<Integer>> windows = Collections - .synchronizedList(new ArrayList<StreamWindow<Integer>>()); - - @Override - public void invoke(StreamWindow<Integer> value) throws Exception { - windows.add(value); - } - - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java new file mode 100644 index 0000000..5e6ffa2 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.WindowMapFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.windowing.StreamWindow; +import org.apache.flink.streaming.api.windowing.helper.Count; +import org.apache.flink.streaming.api.windowing.helper.FullStream; +import org.apache.flink.streaming.api.windowing.helper.Time; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.util.Collector; +import org.junit.Test; + +public class WindowingITCase implements Serializable { + + private static final long serialVersionUID = 1L; + private static final Integer MEMORYSIZE = 32; + + @SuppressWarnings("serial") + public static class ModKey implements KeySelector<Integer, Integer> { + private int m; + + public ModKey(int m) { + this.m = m; + } + + @Override + public Integer getKey(Integer value) throws Exception { + return value % m; + } + } + + @SuppressWarnings("serial") + public static class IdentityWindowMap implements + WindowMapFunction<Integer, StreamWindow<Integer>> { + + @Override + public void mapWindow(Iterable<Integer> values, Collector<StreamWindow<Integer>> out) + throws Exception { + + StreamWindow<Integer> window = new StreamWindow<Integer>(); + + for (Integer value : values) { + window.add(value); + } + out.collect(window); + } + + } + + @SuppressWarnings("serial") + @Test + public void test() throws Exception { + + List<Integer> inputs = new ArrayList<Integer>(); + inputs.add(1); + inputs.add(2); + inputs.add(2); + inputs.add(3); + inputs.add(4); + inputs.add(5); + inputs.add(10); + inputs.add(11); + inputs.add(11); + + KeySelector<Integer, ?> key = new ModKey(2); + + Timestamp<Integer> ts = new Timestamp<Integer>() { + + private static final long serialVersionUID = 1L; + + @Override + public long getTimestamp(Integer value) { + return value; + } + }; + + StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE); + env.disableOperatorChaining(); + + DataStream<Integer> source = env.fromCollection(inputs); + + source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream() + .addSink(new TestSink1()); + + source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap()) + .flatten().addSink(new TestSink2()); + + source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream() + .addSink(new TestSink4()); + + source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2)) + .mapWindow(new IdentityWindowMap()).flatten().addSink(new TestSink5()); + + source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream() + .addSink(new TestSink3()); + + source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream() + .addSink(new TestSink6()); + + source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten() + .addSink(new TestSink7()); + + source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0) + .getDiscretizedStream().addSink(new TestSink8()); + + try { + source.window(FullStream.window()).every(Count.of(2)).getDiscretizedStream(); + fail(); + } catch (Exception e) { + } + try { + source.window(FullStream.window()).getDiscretizedStream(); + fail(); + } catch (Exception e) { + } + try { + source.every(Count.of(5)).mapWindow(new IdentityWindowMap()).getDiscretizedStream(); + fail(); + } catch (Exception e) { + } + + source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new TestSink11()); + + source.window(FullStream.window()).every(Count.of(4)).groupBy(key).sum(0) + .getDiscretizedStream().addSink(new TestSink12()); + + DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + for (int i = 1; i <= 10; i++) { + ctx.collect(i); + } + } + + @Override + public void cancel() { + } + }); + + DataStream<Integer> source3 = env.addSource(new RichParallelSourceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + private int i = 1; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + i = 1 + getRuntimeContext().getIndexOfThisSubtask(); + } + + @Override + public void cancel() { + } + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + for (;i < 11; i += 2) { + ctx.collect(i); + } + + } + }); + + source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink9()); + + source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream() + .addSink(new TestSink10()); + + source.map(new MapFunction<Integer, Integer>() { + @Override + public Integer map(Integer value) throws Exception { + return value; + } + }).every(Time.of(5, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink13()); + + env.execute(); + + // sum ( Time of 3 slide 2 ) + List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>(); + expected1.add(StreamWindow.fromElements(5)); + expected1.add(StreamWindow.fromElements(11)); + expected1.add(StreamWindow.fromElements(9)); + expected1.add(StreamWindow.fromElements(10)); + expected1.add(StreamWindow.fromElements(32)); + + validateOutput(expected1, TestSink1.windows); + + // Tumbling Time of 4 grouped by mod 2 + List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>(); + expected2.add(StreamWindow.fromElements(2, 2, 4)); + expected2.add(StreamWindow.fromElements(1, 3)); + expected2.add(StreamWindow.fromElements(5)); + expected2.add(StreamWindow.fromElements(10)); + expected2.add(StreamWindow.fromElements(11, 11)); + + validateOutput(expected2, TestSink2.windows); + + // groupby mod 2 sum ( Tumbling Time of 4) + List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>(); + expected3.add(StreamWindow.fromElements(4)); + expected3.add(StreamWindow.fromElements(5)); + expected3.add(StreamWindow.fromElements(22)); + expected3.add(StreamWindow.fromElements(8)); + expected3.add(StreamWindow.fromElements(10)); + + validateOutput(expected3, TestSink4.windows); + + // groupby mod3 Tumbling Count of 2 grouped by mod 2 + List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>(); + expected4.add(StreamWindow.fromElements(2, 2)); + expected4.add(StreamWindow.fromElements(1)); + expected4.add(StreamWindow.fromElements(4)); + expected4.add(StreamWindow.fromElements(5, 11)); + expected4.add(StreamWindow.fromElements(10)); + expected4.add(StreamWindow.fromElements(11)); + expected4.add(StreamWindow.fromElements(3)); + + validateOutput(expected4, TestSink5.windows); + + // min ( Time of 2 slide 3 ) + List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>(); + expected5.add(StreamWindow.fromElements(1)); + expected5.add(StreamWindow.fromElements(4)); + expected5.add(StreamWindow.fromElements(10)); + + validateOutput(expected5, TestSink3.windows); + + // groupby mod 2 max ( Tumbling Time of 4) + List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>(); + expected6.add(StreamWindow.fromElements(3)); + expected6.add(StreamWindow.fromElements(5)); + expected6.add(StreamWindow.fromElements(11)); + expected6.add(StreamWindow.fromElements(4)); + expected6.add(StreamWindow.fromElements(10)); + + validateOutput(expected6, TestSink6.windows); + + List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>(); + expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5)); + expected7.add(StreamWindow.fromElements(10)); + expected7.add(StreamWindow.fromElements(10, 11, 11)); + + validateOutput(expected7, TestSink7.windows); + + List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>(); + expected8.add(StreamWindow.fromElements(4, 8)); + expected8.add(StreamWindow.fromElements(4, 5)); + expected8.add(StreamWindow.fromElements(10, 22)); + + for (List<Integer> sw : TestSink8.windows) { + Collections.sort(sw); + } + + validateOutput(expected8, TestSink8.windows); + + List<StreamWindow<Integer>> expected9 = new ArrayList<StreamWindow<Integer>>(); + expected9.add(StreamWindow.fromElements(6)); + expected9.add(StreamWindow.fromElements(14)); + expected9.add(StreamWindow.fromElements(22)); + expected9.add(StreamWindow.fromElements(30)); + expected9.add(StreamWindow.fromElements(38)); + + validateOutput(expected9, TestSink9.windows); + + List<StreamWindow<Integer>> expected10 = new ArrayList<StreamWindow<Integer>>(); + expected10.add(StreamWindow.fromElements(6, 9)); + expected10.add(StreamWindow.fromElements(16, 24)); + + for (List<Integer> sw : TestSink10.windows) { + Collections.sort(sw); + } + + validateOutput(expected10, TestSink10.windows); + + List<StreamWindow<Integer>> expected11 = new ArrayList<StreamWindow<Integer>>(); + expected11.add(StreamWindow.fromElements(8)); + expected11.add(StreamWindow.fromElements(38)); + expected11.add(StreamWindow.fromElements(49)); + + for (List<Integer> sw : TestSink11.windows) { + Collections.sort(sw); + } + + validateOutput(expected11, TestSink11.windows); + + List<StreamWindow<Integer>> expected12 = new ArrayList<StreamWindow<Integer>>(); + expected12.add(StreamWindow.fromElements(4, 4)); + expected12.add(StreamWindow.fromElements(18, 20)); + expected12.add(StreamWindow.fromElements(18, 31)); + + for (List<Integer> sw : TestSink12.windows) { + Collections.sort(sw); + } + + validateOutput(expected12, TestSink12.windows); + + List<StreamWindow<Integer>> expected13 = new ArrayList<StreamWindow<Integer>>(); + expected13.add(StreamWindow.fromElements(17)); + expected13.add(StreamWindow.fromElements(27)); + expected13.add(StreamWindow.fromElements(49)); + + for (List<Integer> sw : TestSink13.windows) { + Collections.sort(sw); + } + + validateOutput(expected13, TestSink13.windows); + + } + + public static <R> void validateOutput(List<R> expected, List<R> actual) { + assertEquals(new HashSet<R>(expected), new HashSet<R>(actual)); + } + + @SuppressWarnings("serial") + private static class TestSink1 implements SinkFunction<StreamWindow<Integer>> { + + public static List<StreamWindow<Integer>> windows = Collections + .synchronizedList(new ArrayList<StreamWindow<Integer>>()); + + @Override + public void invoke(StreamWindow<Integer> value) throws Exception { + windows.add(value); + } + + } + + @SuppressWarnings("serial") + private static class TestSink2 implements SinkFunction<StreamWindow<Integer>> { + + public static List<StreamWindow<Integer>> windows = Collections + .synchronizedList(new ArrayList<StreamWindow<Integer>>()); + + @Override + public void invoke(StreamWindow<Integer> value) throws Exception { + windows.add(value); + } + + } + + @SuppressWarnings("serial") + private static class TestSink3 implements SinkFunction<StreamWindow<Integer>> { + + public static List<StreamWindow<Integer>> windows = Collections + .synchronizedList(new ArrayList<StreamWindow<Integer>>()); + + @Override + public void invoke(StreamWindow<Integer> value) throws Exception { + windows.add(value); + } + + } + + @SuppressWarnings("serial") + private static class TestSink4 implements SinkFunction<StreamWindow<Integer>> { + + public static List<StreamWindow<Integer>> windows = Collections + .synchronizedList(new ArrayList<StreamWindow<Integer>>()); + + @Override + public void invoke(StreamWindow<Integer> value) throws Exception { + windows.add(value); + } + + } + + @SuppressWarnings("serial") + private static class TestSink5 implements SinkFunction<StreamWindow<Integer>> { + + public static List<StreamWindow<Integer>> windows = Collections + .synchronizedList(new ArrayList<StreamWindow<Integer>>()); + + @Override + public void invoke(StreamWindow<Integer> value) throws Exception { + windows.add(value); + } + + } + + @SuppressWarnings("serial") + private static class TestSink6 implements SinkFunction<StreamWindow<Integer>> { + + public static List<StreamWindow<Integer>> windows = Collections + .synchronizedList(new ArrayList<StreamWindow<Integer>>()); + + @Override + public void invoke(StreamWindow<Integer> value) throws Exception { + windows.add(value); + } + + } + + @SuppressWarnings("serial") + private static class TestSink7 implements SinkFunction<StreamWindow<Integer>> { + + public static List<StreamWindow<Integer>> windows = Collections + .synchronizedList(new ArrayList<StreamWindow<Integer>>()); + + @Override + public void invoke(StreamWindow<Integer> value) throws Exception { + windows.add(value); + } + + } + + @SuppressWarnings("serial") + private static class TestSink8 implements SinkFunction<StreamWindow<Integer>> { + + public static List<StreamWindow<Integer>> windows = Collections + .synchronizedList(new ArrayList<StreamWindow<Integer>>()); + + @Override + public void invoke(StreamWindow<Integer> value) throws Exception { + windows.add(value); + } + + } + + @SuppressWarnings("serial") + private static class TestSink9 implements SinkFunction<StreamWindow<Integer>> { + + public static List<StreamWindow<Integer>> windows = Collections + .synchronizedList(new ArrayList<StreamWindow<Integer>>()); + + @Override + public void invoke(StreamWindow<Integer> value) throws Exception { + windows.add(value); + } + + } + + @SuppressWarnings("serial") + private static class TestSink10 implements SinkFunction<StreamWindow<Integer>> { + + public static List<StreamWindow<Integer>> windows = Collections + .synchronizedList(new ArrayList<StreamWindow<Integer>>()); + + @Override + public void invoke(StreamWindow<Integer> value) throws Exception { + windows.add(value); + } + + } + + @SuppressWarnings("serial") + private static class TestSink11 implements SinkFunction<StreamWindow<Integer>> { + + public static List<StreamWindow<Integer>> windows = Collections + .synchronizedList(new ArrayList<StreamWindow<Integer>>()); + + @Override + public void invoke(StreamWindow<Integer> value) throws Exception { + windows.add(value); + } + + } + + @SuppressWarnings("serial") + private static class TestSink12 implements SinkFunction<StreamWindow<Integer>> { + + public static List<StreamWindow<Integer>> windows = Collections + .synchronizedList(new ArrayList<StreamWindow<Integer>>()); + + @Override + public void invoke(StreamWindow<Integer> value) throws Exception { + windows.add(value); + } + + } + + @SuppressWarnings("serial") + private static class TestSink13 implements SinkFunction<StreamWindow<Integer>> { + + public static List<StreamWindow<Integer>> windows = Collections + .synchronizedList(new ArrayList<StreamWindow<Integer>>()); + + @Override + public void invoke(StreamWindow<Integer> value) throws Exception { + windows.add(value); + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java index eb49e26..6e22021 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java @@ -48,6 +48,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.util.InstantiationUtil; @@ -103,12 +105,13 @@ public class StatefulOperatorTest { @Test public void apiTest() throws Exception { StreamExecutionEnvironment env = new TestStreamEnvironment(3, 32); - + KeyedDataStream<Integer> keyedStream = env.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6)).keyBy(new ModKey(4)); keyedStream.map(new StatefulMapper()).addSink(new SinkFunction<String>() { private static final long serialVersionUID = 1L; - public void invoke(String value) throws Exception {} + public void invoke(String value) throws Exception { + } }); keyedStream.map(new StatefulMapper2()).setParallelism(1).addSink(new SinkFunction<String>() { @@ -128,8 +131,8 @@ public class StatefulOperatorTest { private void processInputs(StreamMap<Integer, ?> map, List<Integer> input) throws Exception { for (Integer i : input) { - map.getRuntimeContext().setNextInput(i); - map.processElement(i); + map.getRuntimeContext().setNextInput(new StreamRecord<Integer>(i, 0L)); + map.processElement(new StreamRecord<Integer>(i, 0L)); } } @@ -144,11 +147,16 @@ public class StatefulOperatorTest { StreamMap<Integer, String> op = new StreamMap<Integer, String>(new StatefulMapper()); - op.setup(new Output<String>() { + op.setup(new Output<StreamRecord<String>>() { @Override - public void collect(String record) { - outputList.add(record); + public void collect(StreamRecord<String> record) { + outputList.add(record.getValue()); + } + + @Override + public void emitWatermark(Watermark mark) { + } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java index 4ac7fda..317a21c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java @@ -40,6 +40,6 @@ public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamR @Override public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>> record) { - emittedRecords.add(record.getInstance().getObject().f0); + emittedRecords.add(record.getInstance().getValue().f0); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java index 967c719..6bc0e30 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java @@ -22,10 +22,10 @@ import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.List; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.StreamWindow; -import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer; -import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; -import org.apache.flink.util.Collector; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.junit.Test; public class BasicWindowBufferTest { @@ -33,7 +33,7 @@ public class BasicWindowBufferTest { @Test public void testEmitWindow() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); List<StreamWindow<Integer>> collected = collector.getCollected(); WindowBuffer<Integer> wb = new BasicWindowBuffer<Integer>(); @@ -60,13 +60,13 @@ public class BasicWindowBufferTest { assertEquals(2, collected.size()); } - public static class TestCollector<T> implements Collector<T> { + public static class TestOutput<T> implements Output<StreamRecord<T>> { private final List<T> collected = new ArrayList<T>(); @Override - public void collect(T record) { - collected.add(record); + public void collect(StreamRecord<T> record) { + collected.add(record.getValue()); } @Override @@ -77,6 +77,10 @@ public class BasicWindowBufferTest { return collected; } + @Override + public void emitWatermark(Watermark mark) { + + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java index c91910b..8430499 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java @@ -32,8 +32,9 @@ import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.windowing.StreamWindow; -import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector; import org.apache.flink.streaming.util.keys.KeySelectorUtil; +import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput; + import org.junit.Test; public class JumpingCountGroupedPreReducerTest { @@ -58,7 +59,7 @@ public class JumpingCountGroupedPreReducerTest { inputs.add(new Tuple2<Integer, Integer>(1, -2)); inputs.add(new Tuple2<Integer, Integer>(100, -200)); - TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>(); + TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>(); List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected(); WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>( @@ -109,7 +110,7 @@ public class JumpingCountGroupedPreReducerTest { inputs.add(new Tuple2<Integer, Integer>(1, -2)); inputs.add(new Tuple2<Integer, Integer>(100, -200)); - TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>(); + TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>(); List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected(); WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>( http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java index ba890ab..2279264 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java @@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.windowing.StreamWindow; -import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector; +import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput; import org.junit.Test; public class JumpingCountPreReducerTest { @@ -48,7 +48,7 @@ public class JumpingCountPreReducerTest { inputs.add(new Tuple2<Integer, Integer>(4, -2)); inputs.add(new Tuple2<Integer, Integer>(5, -3)); - TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>(); + TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>(); List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected(); WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountPreReducer<Tuple2<Integer, Integer>>( http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java index 5b693e7..ce312d3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java @@ -27,7 +27,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.windowing.StreamWindow; import org.apache.flink.streaming.api.windowing.helper.Timestamp; import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; -import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector; +import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput; import org.junit.Test; public class JumpingTimePreReducerTest { @@ -39,7 +39,7 @@ public class JumpingTimePreReducerTest { @Test public void testEmitWindow() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); List<StreamWindow<Integer>> collected = collector.getCollected(); WindowBuffer<Integer> wb = new JumpingTimePreReducer<Integer>( http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java index 377bdb5..7f58527 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java @@ -1,34 +1,35 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ package org.apache.flink.streaming.api.windowing.windowbuffer; -import static org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducerTest.checkResults; +import static org.junit.Assert.assertEquals; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest; +import org.apache.flink.streaming.api.operators.windowing.WindowingITCase; import org.apache.flink.streaming.api.windowing.StreamWindow; -import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector; +import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput; import org.junit.Test; public class SlidingCountGroupedPreReducerTest { @@ -37,11 +38,11 @@ public class SlidingCountGroupedPreReducerTest { ReduceFunction<Integer> reducer = new SumReducer(); - KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2); + KeySelector<Integer, ?> key = new WindowingITCase.ModKey(2); @Test public void testPreReduce1() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>( reducer, serializer, key, 3, 2, 0); @@ -84,7 +85,7 @@ public class SlidingCountGroupedPreReducerTest { @Test public void testPreReduce2() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>( reducer, serializer, key, 5, 2, 0); @@ -126,7 +127,7 @@ public class SlidingCountGroupedPreReducerTest { @Test public void testPreReduce3() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>( reducer, serializer, key, 6, 3, 0); @@ -163,7 +164,7 @@ public class SlidingCountGroupedPreReducerTest { @Test public void testPreReduce4() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>( reducer, serializer, key, 5, 1, 2); @@ -217,4 +218,18 @@ public class SlidingCountGroupedPreReducerTest { } + + protected static void checkResults(List<StreamWindow<Integer>> expected, + List<StreamWindow<Integer>> actual) { + + for (StreamWindow<Integer> sw : expected) { + Collections.sort(sw); + } + + for (StreamWindow<Integer> sw : actual) { + Collections.sort(sw); + } + + assertEquals(expected, actual); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java index 3ce65f1..156b875 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java @@ -26,7 +26,7 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.windowing.StreamWindow; -import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector; +import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput; import org.junit.Test; public class SlidingCountPreReducerTest { @@ -37,7 +37,7 @@ public class SlidingCountPreReducerTest { @Test public void testPreReduce1() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer, serializer, 3, 2, 0); @@ -80,7 +80,7 @@ public class SlidingCountPreReducerTest { @Test public void testPreReduce2() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer, serializer, 5, 2, 0); @@ -122,7 +122,7 @@ public class SlidingCountPreReducerTest { @Test public void testPreReduce3() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer, serializer, 6, 3, 0); @@ -159,7 +159,7 @@ public class SlidingCountPreReducerTest { @Test public void testPreReduce4() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer, serializer, 5, 1, 2); http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java index 3f1cba1..68bceda 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java @@ -31,11 +31,11 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest; +import org.apache.flink.streaming.api.operators.windowing.WindowingITCase; import org.apache.flink.streaming.api.windowing.StreamWindow; import org.apache.flink.streaming.api.windowing.helper.Timestamp; import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; -import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector; +import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput; import org.junit.Test; public class SlidingTimeGroupedPreReducerTest { @@ -48,7 +48,7 @@ public class SlidingTimeGroupedPreReducerTest { ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new TupleSumReducer(); - KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2); + KeySelector<Integer, ?> key = new WindowingITCase.ModKey(2); KeySelector<Tuple2<Integer, Integer>, ?> tupleKey = new TupleModKey(2); @Test @@ -58,7 +58,7 @@ public class SlidingTimeGroupedPreReducerTest { // replaying the same sequence of elements with a later timestamp and expecting the same // result. - TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>(); + TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>(); SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>>(tupleReducer, tupleType.createSerializer(new ExecutionConfig()), tupleKey, 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() { @@ -190,7 +190,7 @@ public class SlidingTimeGroupedPreReducerTest { @Test public void testPreReduce2() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>( reducer, serializer, key, 5, 2, new TimestampWrapper<Integer>( @@ -241,7 +241,7 @@ public class SlidingTimeGroupedPreReducerTest { @Test public void testPreReduce3() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>( reducer, serializer, key, 6, 3, new TimestampWrapper<Integer>( @@ -287,7 +287,7 @@ public class SlidingTimeGroupedPreReducerTest { @Test public void testPreReduce4() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>( reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>( http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java index 0519da7..6a36c57 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java @@ -32,7 +32,7 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.streaming.api.windowing.StreamWindow; import org.apache.flink.streaming.api.windowing.helper.Timestamp; import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; -import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector; +import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput; import org.junit.Test; public class SlidingTimePreReducerTest { @@ -50,7 +50,7 @@ public class SlidingTimePreReducerTest { // replaying the same sequence of elements with a later timestamp and expecting the same // result. - TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>(); + TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>(); SlidingTimePreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimePreReducer<Tuple2<Integer, Integer>>(tupleReducer, tupleType.createSerializer(new ExecutionConfig()), 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() { @@ -145,7 +145,7 @@ public class SlidingTimePreReducerTest { @Test public void testPreReduce2() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer, serializer, 5, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() { @@ -195,7 +195,7 @@ public class SlidingTimePreReducerTest { @Test public void testPreReduce3() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer, serializer, 6, 3, new TimestampWrapper<Integer>(new Timestamp<Integer>() { @@ -240,7 +240,7 @@ public class SlidingTimePreReducerTest { @Test public void testPreReduce4() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>(); SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer, serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() { http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java index c5107bf..3aee288 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java @@ -32,7 +32,7 @@ import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.windowing.StreamWindow; -import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector; +import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput; import org.apache.flink.streaming.util.keys.KeySelectorUtil; import org.junit.Test; @@ -57,7 +57,7 @@ public class TumblingGroupedPreReducerTest { inputs.add(new Tuple2<Integer, Integer>(1, -1)); inputs.add(new Tuple2<Integer, Integer>(1, -2)); - TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>(); + TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>(); List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected(); WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingGroupedPreReducer<Tuple2<Integer, Integer>>( @@ -104,7 +104,7 @@ public class TumblingGroupedPreReducerTest { inputs.add(new Tuple2<Integer, Integer>(1, -1)); inputs.add(new Tuple2<Integer, Integer>(1, -2)); - TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>(); + TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>(); List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected(); WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingGroupedPreReducer<Tuple2<Integer, Integer>>( http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java index b8de02e..3e537a5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java @@ -27,9 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.windowing.StreamWindow; -import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer; -import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; -import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector; +import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput; import org.junit.Test; public class TumblingPreReducerTest { @@ -49,7 +47,7 @@ public class TumblingPreReducerTest { inputs.add(new Tuple2<Integer, Integer>(3, -1)); inputs.add(new Tuple2<Integer, Integer>(4, -2)); - TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>(); + TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>(); List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected(); WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingPreReducer<Tuple2<Integer, Integer>>( http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java index 3f8401d..d8a3696 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.util.event.EventListener; -import org.apache.flink.streaming.runtime.io.BarrierBuffer; import org.junit.Test; public class BarrierBufferIOTest { @@ -55,7 +54,7 @@ public class BarrierBufferIOTest { if (boe.isBuffer()) { boe.getBuffer().recycle(); } else { - barrierBuffer.processSuperstep(boe); + barrierBuffer.processBarrier(boe); } } // System.out.println("Ran for " + (System.currentTimeMillis() - @@ -101,14 +100,14 @@ public class BarrierBufferIOTest { private int numChannels; private BufferPool[] bufferPools; - private int[] currentSupersteps; + private int[] currentBarriers; BarrierGenerator[] barrierGens; int currentChannel = 0; long c = 0; public MockInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) { this.numChannels = bufferPools.length; - this.currentSupersteps = new int[numChannels]; + this.currentBarriers = new int[numChannels]; this.bufferPools = bufferPools; this.barrierGens = barrierGens; } @@ -132,7 +131,7 @@ public class BarrierBufferIOTest { currentChannel = (currentChannel + 1) % numChannels; if (barrierGens[currentChannel].isNextBarrier()) { - return BarrierBufferTest.createSuperstep(++currentSupersteps[currentChannel], + return BarrierBufferTest.createBarrier(++currentBarriers[currentChannel], currentChannel); } else { Buffer buffer = bufferPools[currentChannel].requestBuffer(); http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java index 89ec7dc..cb5e046 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java @@ -35,7 +35,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.util.event.EventListener; -import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep; +import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier; import org.junit.Test; @@ -67,10 +67,10 @@ public class BarrierBufferTest { List<BufferOrEvent> input = new LinkedList<BufferOrEvent>(); input.add(createBuffer(0)); input.add(createBuffer(0)); - input.add(createSuperstep(1, 0)); + input.add(createBarrier(1, 0)); input.add(createBuffer(0)); input.add(createBuffer(0)); - input.add(createSuperstep(2, 0)); + input.add(createBarrier(2, 0)); input.add(createBuffer(0)); InputGate mockIG = new MockInputGate(1, input); @@ -82,11 +82,11 @@ public class BarrierBufferTest { assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked()); assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked()); assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked()); - bb.processSuperstep(nextBoe); + bb.processBarrier(nextBoe); assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked()); assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked()); assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked()); - bb.processSuperstep(nextBoe); + bb.processBarrier(nextBoe); assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked()); bb.cleanup(); @@ -98,18 +98,18 @@ public class BarrierBufferTest { List<BufferOrEvent> input = new LinkedList<BufferOrEvent>(); input.add(createBuffer(0)); input.add(createBuffer(1)); - input.add(createSuperstep(1, 0)); - input.add(createSuperstep(2, 0)); + input.add(createBarrier(1, 0)); + input.add(createBarrier(2, 0)); input.add(createBuffer(0)); - input.add(createSuperstep(3, 0)); + input.add(createBarrier(3, 0)); input.add(createBuffer(0)); input.add(createBuffer(1)); - input.add(createSuperstep(1, 1)); + input.add(createBarrier(1, 1)); input.add(createBuffer(0)); input.add(createBuffer(1)); - input.add(createSuperstep(2, 1)); - input.add(createSuperstep(3, 1)); - input.add(createSuperstep(4, 0)); + input.add(createBarrier(2, 1)); + input.add(createBarrier(3, 1)); + input.add(createBarrier(4, 0)); input.add(createBuffer(0)); input.add(new BufferOrEvent(new EndOfPartitionEvent(), 1)); @@ -123,24 +123,24 @@ public class BarrierBufferTest { check(input.get(0), nextBoe = bb.getNextNonBlocked()); check(input.get(1), nextBoe = bb.getNextNonBlocked()); check(input.get(2), nextBoe = bb.getNextNonBlocked()); - bb.processSuperstep(nextBoe); + bb.processBarrier(nextBoe); check(input.get(7), nextBoe = bb.getNextNonBlocked()); check(input.get(8), nextBoe = bb.getNextNonBlocked()); - bb.processSuperstep(nextBoe); + bb.processBarrier(nextBoe); check(input.get(3), nextBoe = bb.getNextNonBlocked()); - bb.processSuperstep(nextBoe); + bb.processBarrier(nextBoe); check(input.get(10), nextBoe = bb.getNextNonBlocked()); check(input.get(11), nextBoe = bb.getNextNonBlocked()); - bb.processSuperstep(nextBoe); + bb.processBarrier(nextBoe); check(input.get(4), nextBoe = bb.getNextNonBlocked()); check(input.get(5), nextBoe = bb.getNextNonBlocked()); - bb.processSuperstep(nextBoe); + bb.processBarrier(nextBoe); check(input.get(12), nextBoe = bb.getNextNonBlocked()); - bb.processSuperstep(nextBoe); + bb.processBarrier(nextBoe); check(input.get(6), nextBoe = bb.getNextNonBlocked()); check(input.get(9), nextBoe = bb.getNextNonBlocked()); check(input.get(13), nextBoe = bb.getNextNonBlocked()); - bb.processSuperstep(nextBoe); + bb.processBarrier(nextBoe); check(input.get(14), nextBoe = bb.getNextNonBlocked()); check(input.get(15), nextBoe = bb.getNextNonBlocked()); @@ -206,8 +206,8 @@ public class BarrierBufferTest { } } - protected static BufferOrEvent createSuperstep(long id, int channel) { - return new BufferOrEvent(new StreamingSuperstep(id, System.currentTimeMillis()), channel); + protected static BufferOrEvent createBarrier(long id, int channel) { + return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel); } protected static BufferOrEvent createBuffer(int channel) { http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java deleted file mode 100644 index 528829d..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.io; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.streaming.runtime.io.BarrierBuffer; -import org.apache.flink.streaming.runtime.io.CoRecordReader; -import org.apache.flink.streaming.runtime.io.BarrierBufferTest.MockInputGate; -import org.junit.Test; - -public class CoRecordReaderTest { - - @Test - public void test() throws InterruptedException, IOException { - - List<BufferOrEvent> input1 = new LinkedList<BufferOrEvent>(); - input1.add(BarrierBufferTest.createBuffer(0)); - input1.add(BarrierBufferTest.createSuperstep(1, 0)); - input1.add(BarrierBufferTest.createBuffer(0)); - - InputGate ig1 = new MockInputGate(1, input1); - - List<BufferOrEvent> input2 = new LinkedList<BufferOrEvent>(); - input2.add(BarrierBufferTest.createBuffer(0)); - input2.add(BarrierBufferTest.createBuffer(0)); - input2.add(BarrierBufferTest.createSuperstep(1, 0)); - input2.add(BarrierBufferTest.createBuffer(0)); - - InputGate ig2 = new MockInputGate(1, input2); - - CoRecordReader<?, ?> coReader = new CoRecordReader<IOReadableWritable, IOReadableWritable>( - ig1, ig2); - BarrierBuffer b1 = coReader.barrierBuffer1; - BarrierBuffer b2 = coReader.barrierBuffer2; - - coReader.addToAvailable(ig1); - coReader.addToAvailable(ig2); - coReader.addToAvailable(ig2); - coReader.addToAvailable(ig1); - - assertEquals(1, coReader.getNextReaderIndexBlocking()); - b1.getNextNonBlocked(); - - assertEquals(2, coReader.getNextReaderIndexBlocking()); - b2.getNextNonBlocked(); - - assertEquals(2, coReader.getNextReaderIndexBlocking()); - b2.getNextNonBlocked(); - - assertEquals(1, coReader.getNextReaderIndexBlocking()); - b1.getNextNonBlocked(); - b1.processSuperstep(input1.get(1)); - - coReader.addToAvailable(ig1); - coReader.addToAvailable(ig2); - coReader.addToAvailable(ig2); - - assertEquals(2, coReader.getNextReaderIndexBlocking()); - b2.getNextNonBlocked(); - b2.processSuperstep(input2.get(2)); - - assertEquals(1, coReader.getNextReaderIndexBlocking()); - b1.getNextNonBlocked(); - - assertEquals(2, coReader.getNextReaderIndexBlocking()); - b2.getNextNonBlocked(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java index aa4d24a..a1cea13 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertArrayEquals; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.junit.Before; import org.junit.Test; @@ -32,7 +31,7 @@ public class BroadcastPartitionerTest { private BroadcastPartitioner<Tuple> broadcastPartitioner2; private BroadcastPartitioner<Tuple> broadcastPartitioner3; - private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(); + private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null); private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(null); @Before http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java index b37e43a..2643bba 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java @@ -28,7 +28,7 @@ import org.junit.Test; public class DistributePartitionerTest { private RebalancePartitioner<Tuple> distributePartitioner; - private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(); + private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null); private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>( null); http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java index 94d29ac..05541f5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java @@ -21,34 +21,28 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.junit.Before; import org.junit.Test; public class FieldsPartitionerTest { - private FieldsPartitioner<Tuple> fieldsPartitioner; - private StreamRecord<Tuple> streamRecord1 = new StreamRecord<Tuple>() - .setObject(new Tuple2<String, Integer>("test", 0)); - private StreamRecord<Tuple> streamRecord2 = new StreamRecord<Tuple>() - .setObject(new Tuple2<String, Integer>("test", 42)); - private SerializationDelegate<StreamRecord<Tuple>> sd1 = new SerializationDelegate<StreamRecord<Tuple>>( - null); - private SerializationDelegate<StreamRecord<Tuple>> sd2 = new SerializationDelegate<StreamRecord<Tuple>>( - null); + private FieldsPartitioner<Tuple2<String, Integer>> fieldsPartitioner; + private StreamRecord<Tuple2<String, Integer>> streamRecord1 = new StreamRecord<Tuple2<String, Integer>>(new Tuple2<String, Integer>("test", 0)); + private StreamRecord<Tuple2<String, Integer>> streamRecord2 = new StreamRecord<Tuple2<String, Integer>>(new Tuple2<String, Integer>("test", 42)); + private SerializationDelegate<StreamRecord<Tuple2<String, Integer>>> sd1 = new SerializationDelegate<StreamRecord<Tuple2<String, Integer>>>(null); + private SerializationDelegate<StreamRecord<Tuple2<String, Integer>>> sd2 = new SerializationDelegate<StreamRecord<Tuple2<String, Integer>>>(null); @Before public void setPartitioner() { - fieldsPartitioner = new FieldsPartitioner<Tuple>(new KeySelector<Tuple, String>() { + fieldsPartitioner = new FieldsPartitioner<Tuple2<String, Integer>>(new KeySelector<Tuple2<String, Integer>, String>() { private static final long serialVersionUID = 1L; @Override - public String getKey(Tuple value) throws Exception { + public String getKey(Tuple2<String, Integer> value) throws Exception { return value.getField(0); } });