http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java new file mode 100644 index 0000000..8499aa2 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.RichFoldFunction; +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.joda.time.Instant; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for {@link StreamGroupedFold}. These test that: + * + * <ul> + * <li>RichFunction methods are called correctly</li> + * <li>Timestamps of processed elements match the input timestamp</li> + * <li>Watermarks are correctly forwarded</li> + * </ul> + */ +public class StreamGroupedFoldTest { + + private static class MyFolder implements FoldFunction<Integer, String> { + + private static final long serialVersionUID = 1L; + + @Override + public String fold(String accumulator, Integer value) throws Exception { + return accumulator + value.toString(); + } + + } + + @Test + @SuppressWarnings("unchecked") + public void testGroupedFold() throws Exception { + TypeInformation<String> outType = TypeExtractor.getForObject("A string"); + + StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<Integer, String>( + new MyFolder(), new KeySelector<Integer, String>() { + + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Integer value) throws Exception { + return value.toString(); + } + }, "100", outType); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator); + + long initialTime = 0L; + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue(); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1)); + testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 2)); + testHarness.processWatermark(new Watermark(initialTime + 2)); + testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 3)); + testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 4)); + testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 5)); + + expectedOutput.add(new StreamRecord<String>("1001", initialTime + 1)); + expectedOutput.add(new StreamRecord<String>("10011", initialTime + 2)); + expectedOutput.add(new Watermark(initialTime + 2)); + expectedOutput.add(new StreamRecord<String>("1002", initialTime + 3)); + expectedOutput.add(new StreamRecord<String>("10022", initialTime + 4)); + expectedOutput.add(new StreamRecord<String>("1003", initialTime + 5)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + } + + @Test + public void testOpenClose() throws Exception { + StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<Integer, String>(new TestOpenCloseFoldFunction(), new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }, "init", BasicTypeInfo.STRING_TYPE_INFO); + OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator); + + long initialTime = 0L; + + testHarness.open(); + + testHarness.processElement(new StreamRecord<Integer>(1, initialTime)); + testHarness.processElement(new StreamRecord<Integer>(2, initialTime)); + + testHarness.close(); + + Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFoldFunction.closeCalled); + Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0); + } + + // This must only be used in one test, otherwise the static fields will be changed + // by several tests concurrently + private static class TestOpenCloseFoldFunction extends RichFoldFunction<Integer, String> { + private static final long serialVersionUID = 1L; + + public static boolean openCalled = false; + public static boolean closeCalled = false; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + if (closeCalled) { + Assert.fail("Close called before open."); + } + openCalled = true; + } + + @Override + public void close() throws Exception { + super.close(); + if (!openCalled) { + Assert.fail("Open was not called before close."); + } + closeCalled = true; + } + + @Override + public String fold(String acc, Integer in) throws Exception { + if (!openCalled) { + Assert.fail("Open was not called before run."); + } + return acc + in; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java new file mode 100644 index 0000000..dca1cbb --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + + +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.joda.time.Instant; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for {@link StreamGroupedReduce}. These test that: + * + * <ul> + * <li>RichFunction methods are called correctly</li> + * <li>Timestamps of processed elements match the input timestamp</li> + * <li>Watermarks are correctly forwarded</li> + * </ul> + */ + +public class StreamGroupedReduceTest { + + private static class MyReducer implements ReduceFunction<Integer> { + + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce(Integer value1, Integer value2) throws Exception { + return value1 + value2; + } + + } + + @Test + @SuppressWarnings("unchecked") + public void testGroupedReduce() throws Exception { + StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<Integer>(new MyReducer(), new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }); + + OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator); + + long initialTime = 0L; + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue(); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1)); + testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 2)); + testHarness.processWatermark(new Watermark(initialTime + 2)); + testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 3)); + testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 4)); + testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 5)); + + expectedOutput.add(new StreamRecord<Integer>(1, initialTime + 1)); + expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2)); + expectedOutput.add(new Watermark(initialTime + 2)); + expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 3)); + expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4)); + expectedOutput.add(new StreamRecord<Integer>(3, initialTime + 5)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + } + + @Test + public void testOpenClose() throws Exception { + StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<Integer>(new TestOpenCloseReduceFunction(), new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }); + OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator); + + long initialTime = 0L; + + testHarness.open(); + + testHarness.processElement(new StreamRecord<Integer>(1, initialTime)); + testHarness.processElement(new StreamRecord<Integer>(2, initialTime)); + + testHarness.close(); + + Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseReduceFunction.closeCalled); + Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0); + } + + // This must only be used in one test, otherwise the static fields will be changed + // by several tests concurrently + private static class TestOpenCloseReduceFunction extends RichReduceFunction<Integer> { + private static final long serialVersionUID = 1L; + + public static boolean openCalled = false; + public static boolean closeCalled = false; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + if (closeCalled) { + Assert.fail("Close called before open."); + } + openCalled = true; + } + + @Override + public void close() throws Exception { + super.close(); + if (!openCalled) { + Assert.fail("Open was not called before close."); + } + closeCalled = true; + } + + @Override + public Integer reduce(Integer in1, Integer in2) throws Exception { + if (!openCalled) { + Assert.fail("Open was not called before run."); + } + return in1 + in2; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java new file mode 100644 index 0000000..d5f2f62 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.joda.time.Instant; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for {@link StreamMap}. These test that: + * + * <ul> + * <li>RichFunction methods are called correctly</li> + * <li>Timestamps of processed elements match the input timestamp</li> + * <li>Watermarks are correctly forwarded</li> + * </ul> + */ +public class StreamMapTest { + + private static class Map implements MapFunction<Integer, String> { + private static final long serialVersionUID = 1L; + + @Override + public String map(Integer value) throws Exception { + return "+" + (value + 1); + } + } + + @Test + public void testMap() throws Exception { + StreamMap<Integer, String> operator = new StreamMap<Integer, String>(new Map()); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator); + + long initialTime = 0L; + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue(); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1)); + testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2)); + testHarness.processWatermark(new Watermark(initialTime + 2)); + testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3)); + + expectedOutput.add(new StreamRecord<String>("+2", initialTime + 1)); + expectedOutput.add(new StreamRecord<String>("+3", initialTime + 2)); + expectedOutput.add(new Watermark(initialTime + 2)); + expectedOutput.add(new StreamRecord<String>("+4", initialTime + 3)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + } + + @Test + public void testOpenClose() throws Exception { + StreamMap<String, String> operator = new StreamMap<String, String>(new TestOpenCloseMapFunction()); + + OneInputStreamOperatorTestHarness<String, String> testHarness = new OneInputStreamOperatorTestHarness<String, String>(operator); + + long initialTime = 0L; + + testHarness.open(); + + testHarness.processElement(new StreamRecord<String>("Hello", initialTime)); + + testHarness.close(); + + Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled); + Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0); + } + + // This must only be used in one test, otherwise the static fields will be changed + // by several tests concurrently + private static class TestOpenCloseMapFunction extends RichMapFunction<String, String> { + private static final long serialVersionUID = 1L; + + public static boolean openCalled = false; + public static boolean closeCalled = false; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + if (closeCalled) { + Assert.fail("Close called before open."); + } + openCalled = true; + } + + @Override + public void close() throws Exception { + super.close(); + if (!openCalled) { + Assert.fail("Open was not called before close."); + } + closeCalled = true; + } + + @Override + public String map(String value) throws Exception { + if (!openCalled) { + Assert.fail("Open was not called before run."); + } + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java new file mode 100644 index 0000000..ede7db5 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.datastream.StreamProjection; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.joda.time.Instant; +import org.junit.Test; + +/** + * Tests for {@link StreamProject}. These test that: + * + * <ul> + * <li>Timestamps of processed elements match the input timestamp</li> + * <li>Watermarks are correctly forwarded</li> + * </ul> + */ +public class StreamProjectTest implements Serializable { + private static final long serialVersionUID = 1L; + + @Test + public void testProject() throws Exception { + + TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor + .getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4)); + + int[] fields = new int[]{4, 4, 3}; + + TupleSerializer<Tuple3<Integer, Integer, String>> serializer = + new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection.extractFieldTypes(fields, inType)) + .createSerializer(new ExecutionConfig()); + @SuppressWarnings("unchecked") + StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> operator = + new StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>( + fields, serializer); + + OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> testHarness = new OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(operator); + + long initialTime = 0L; + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue(); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4), initialTime + 1)); + testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "s", 3, "c", 2), initialTime + 2)); + testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "c", 2), initialTime + 3)); + testHarness.processWatermark(new Watermark(initialTime + 2)); + testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "a", 7), initialTime + 4)); + + expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(4, 4, "b"), initialTime + 1)); + expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(2, 2, "c"), initialTime + 2)); + expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(2, 2, "c"), initialTime + 3)); + expectedOutput.add(new Watermark(initialTime + 2)); + expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(7, 7, "a"), initialTime + 4)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + } + + + // tests using projection from the API without explicitly specifying the types + private static final long MEMORY_SIZE = 32; + private static HashSet<Tuple2<Long, Double>> expected = new HashSet<Tuple2<Long, Double>>(); + private static HashSet<Tuple2<Long, Double>> actual = new HashSet<Tuple2<Long, Double>>(); + + @Test + public void APIWithoutTypesTest() { + + for (Long i = 1L; i < 11L; i++) { + expected.add(new Tuple2<Long, Double>(i, i.doubleValue())); + } + + StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE); + + env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() { + @Override + public Tuple3<Long, Character, Double> map(Long value) throws Exception { + return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue()); + } + }) + .project(0, 2) + .addSink(new SinkFunction<Tuple>() { + @Override + @SuppressWarnings("unchecked") + public void invoke(Tuple value) throws Exception { + actual.add( (Tuple2<Long,Double>) value); + } + }); + + try { + env.execute(); + } catch (Exception e) { + fail(e.getMessage()); + } + + assertEquals(expected, actual); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java deleted file mode 100644 index 7f23e23..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators.co; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; -import org.apache.flink.streaming.util.MockCoContext; -import org.apache.flink.util.Collector; -import org.junit.Test; - -public class CoFlatMapTest implements Serializable { - private static final long serialVersionUID = 1L; - - private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> { - private static final long serialVersionUID = 1L; - - @Override - public void flatMap1(String value, Collector<String> coll) { - for (int i = 0; i < value.length(); i++) { - coll.collect(value.substring(i, i + 1)); - } - } - - @Override - public void flatMap2(Integer value, Collector<String> coll) { - coll.collect(value.toString()); - } - } - - @Test - public void coFlatMapTest() { - CoStreamFlatMap<String, Integer, String> invokable = new CoStreamFlatMap<String, Integer, String>( - new MyCoFlatMap()); - - List<String> expectedList = Arrays.asList("a", "b", "c", "1", "d", "e", "f", "2", "g", "h", - "e", "3", "4", "5"); - List<String> actualList = MockCoContext.createAndExecute(invokable, - Arrays.asList("abc", "def", "ghe"), Arrays.asList(1, 2, 3, 4, 5)); - - assertEquals(expectedList, actualList); - } - - @SuppressWarnings("unchecked") - @Test - public void multipleInputTest() { - LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); - - DataStream<Integer> ds1 = env.fromElements(1, 3, 5); - DataStream<Integer> ds2 = env.fromElements(2, 4).union(ds1); - - try { - ds1.forward().union(ds2); - fail(); - } catch (RuntimeException e) { - // expected - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java index d01d0d3..39e85e9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java @@ -1,125 +1,125 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators.co; - -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.functions.co.CoReduceFunction; -import org.apache.flink.streaming.api.operators.co.CoStreamGroupedReduce; -import org.apache.flink.streaming.util.MockCoContext; -import org.junit.Test; - -public class CoGroupedReduceTest { - - private final static class MyCoReduceFunction implements - CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple3<String, String, String> reduce1(Tuple3<String, String, String> value1, - Tuple3<String, String, String> value2) { - return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2); - } - - @Override - public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1, - Tuple2<Integer, Integer> value2) { - return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1); - } - - @Override - public String map1(Tuple3<String, String, String> value) { - return value.f1; - } - - @Override - public String map2(Tuple2<Integer, Integer> value) { - return value.f1.toString(); - } - } - - @SuppressWarnings("unchecked") - @Test - public void coGroupedReduceTest() { - Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b"); - Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a"); - Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a"); - Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1); - Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2); - Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3); - Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4); - Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5); - - KeySelector<Tuple3<String, String, String>, ?> keySelector0 = new KeySelector<Tuple3<String, String, String>, String>() { - - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Tuple3<String, String, String> value) throws Exception { - return value.f0; - } - }; - - KeySelector<Tuple2<Integer, Integer>, ?> keySelector1 = new KeySelector<Tuple2<Integer, Integer>, Integer>() { - - private static final long serialVersionUID = 1L; - - @Override - public Integer getKey(Tuple2<Integer, Integer> value) throws Exception { - return value.f0; - } - }; - - KeySelector<Tuple3<String, String, String>, ?> keySelector2 = new KeySelector<Tuple3<String, String, String>, String>() { - - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Tuple3<String, String, String> value) throws Exception { - return value.f2; - } - }; - - CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>( - new MyCoReduceFunction(), keySelector0, keySelector1); - - List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5", - "7"); - - List<String> actualList = MockCoContext.createAndExecute(invokable, - Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5)); - - assertEquals(expected, actualList); - - invokable = new CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>( - new MyCoReduceFunction(), keySelector2, keySelector1); - - expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7"); - - actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(word1, word2, word3), - Arrays.asList(int1, int2, int3, int4, int5)); - - assertEquals(expected, actualList); - } -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package org.apache.flink.streaming.api.operators.co; +// +//import static org.junit.Assert.assertEquals; +// +//import java.util.Arrays; +//import java.util.List; +// +//import org.apache.flink.api.java.functions.KeySelector; +//import org.apache.flink.api.java.tuple.Tuple2; +//import org.apache.flink.api.java.tuple.Tuple3; +//import org.apache.flink.streaming.api.functions.co.CoReduceFunction; +//import org.apache.flink.streaming.api.operators.co.CoStreamGroupedReduce; +//import org.apache.flink.streaming.util.MockCoContext; +//import org.junit.Test; +// +//public class CoGroupedReduceTest { +// +// private final static class MyCoReduceFunction implements +// CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> { +// private static final long serialVersionUID = 1L; +// +// @Override +// public Tuple3<String, String, String> reduce1(Tuple3<String, String, String> value1, +// Tuple3<String, String, String> value2) { +// return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2); +// } +// +// @Override +// public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1, +// Tuple2<Integer, Integer> value2) { +// return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1); +// } +// +// @Override +// public String map1(Tuple3<String, String, String> value) { +// return value.f1; +// } +// +// @Override +// public String map2(Tuple2<Integer, Integer> value) { +// return value.f1.toString(); +// } +// } +// +// @SuppressWarnings("unchecked") +// @Test +// public void coGroupedReduceTest() { +// Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b"); +// Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a"); +// Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a"); +// Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1); +// Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2); +// Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3); +// Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4); +// Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5); +// +// KeySelector<Tuple3<String, String, String>, ?> keySelector0 = new KeySelector<Tuple3<String, String, String>, String>() { +// +// private static final long serialVersionUID = 1L; +// +// @Override +// public String getKey(Tuple3<String, String, String> value) throws Exception { +// return value.f0; +// } +// }; +// +// KeySelector<Tuple2<Integer, Integer>, ?> keySelector1 = new KeySelector<Tuple2<Integer, Integer>, Integer>() { +// +// private static final long serialVersionUID = 1L; +// +// @Override +// public Integer getKey(Tuple2<Integer, Integer> value) throws Exception { +// return value.f0; +// } +// }; +// +// KeySelector<Tuple3<String, String, String>, ?> keySelector2 = new KeySelector<Tuple3<String, String, String>, String>() { +// +// private static final long serialVersionUID = 1L; +// +// @Override +// public String getKey(Tuple3<String, String, String> value) throws Exception { +// return value.f2; +// } +// }; +// +// CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>( +// new MyCoReduceFunction(), keySelector0, keySelector1); +// +// List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5", +// "7"); +// +// List<String> actualList = MockCoContext.createAndExecute(invokable, +// Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5)); +// +// assertEquals(expected, actualList); +// +// invokable = new CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>( +// new MyCoReduceFunction(), keySelector2, keySelector1); +// +// expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7"); +// +// actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(word1, word2, word3), +// Arrays.asList(int1, int2, int3, int4, int5)); +// +// assertEquals(expected, actualList); +// } +//} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoMapTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoMapTest.java deleted file mode 100644 index 2a2560d..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoMapTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators.co; - -import static org.junit.Assert.assertEquals; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.streaming.api.functions.co.CoMapFunction; -import org.apache.flink.streaming.api.operators.co.CoStreamMap; -import org.apache.flink.streaming.util.MockCoContext; -import org.junit.Test; - -public class CoMapTest implements Serializable { - private static final long serialVersionUID = 1L; - - private final static class MyCoMap implements CoMapFunction<Double, Integer, String> { - private static final long serialVersionUID = 1L; - - @Override - public String map1(Double value) { - return value.toString(); - } - - @Override - public String map2(Integer value) { - return value.toString(); - } - } - - @Test - public void coMapTest() { - CoStreamMap<Double, Integer, String> invokable = new CoStreamMap<Double, Integer, String>(new MyCoMap()); - - List<String> expectedList = Arrays.asList("1.1", "1", "1.2", "2", "1.3", "3", "1.4", "1.5"); - List<String> actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3)); - - assertEquals(expectedList, actualList); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java new file mode 100644 index 0000000..2c9ba5c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.co; + +import static org.junit.Assert.fail; + +import java.io.Serializable; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; +import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for {@link CoStreamFlatMap}. These test that: + * + * <ul> + * <li>RichFunction methods are called correctly</li> + * <li>Timestamps of processed elements match the input timestamp</li> + * <li>Watermarks are correctly forwarded</li> + * </ul> + */ +public class CoStreamFlatMapTest implements Serializable { + private static final long serialVersionUID = 1L; + + private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap1(String value, Collector<String> coll) { + for (int i = 0; i < value.length(); i++) { + coll.collect(value.substring(i, i + 1)); + } + } + + @Override + public void flatMap2(Integer value, Collector<String> coll) { + coll.collect(value.toString()); + } + } + + @Test + public void testCoFlatMap() throws Exception { + CoStreamFlatMap<String, Integer, String> operator = new CoStreamFlatMap<String, Integer, String>(new MyCoFlatMap()); + + TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<String, Integer, String>(operator); + + long initialTime = 0L; + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue(); + + testHarness.open(); + + testHarness.processElement1(new StreamRecord<String>("abc", initialTime + 1)); + testHarness.processElement1(new StreamRecord<String>("def", initialTime + 2)); + testHarness.processWatermark1(new Watermark(initialTime + 2)); + testHarness.processElement1(new StreamRecord<String>("ghi", initialTime + 3)); + + testHarness.processElement2(new StreamRecord<Integer>(1, initialTime + 1)); + testHarness.processElement2(new StreamRecord<Integer>(2, initialTime + 2)); + testHarness.processWatermark2(new Watermark(initialTime + 3)); + testHarness.processElement2(new StreamRecord<Integer>(3, initialTime + 3)); + testHarness.processElement2(new StreamRecord<Integer>(4, initialTime + 4)); + testHarness.processElement2(new StreamRecord<Integer>(5, initialTime + 5)); + + expectedOutput.add(new StreamRecord<String>("a", initialTime + 1)); + expectedOutput.add(new StreamRecord<String>("b", initialTime + 1)); + expectedOutput.add(new StreamRecord<String>("c", initialTime + 1)); + expectedOutput.add(new StreamRecord<String>("d", initialTime + 2)); + expectedOutput.add(new StreamRecord<String>("e", initialTime + 2)); + expectedOutput.add(new StreamRecord<String>("f", initialTime + 2)); + expectedOutput.add(new StreamRecord<String>("g", initialTime + 3)); + expectedOutput.add(new StreamRecord<String>("h", initialTime + 3)); + expectedOutput.add(new StreamRecord<String>("i", initialTime + 3)); + + expectedOutput.add(new StreamRecord<String>("1", initialTime + 1)); + expectedOutput.add(new StreamRecord<String>("2", initialTime + 2)); + expectedOutput.add(new Watermark(initialTime + 2)); + expectedOutput.add(new StreamRecord<String>("3", initialTime + 3)); + expectedOutput.add(new StreamRecord<String>("4", initialTime + 4)); + expectedOutput.add(new StreamRecord<String>("5", initialTime + 5)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + } + + @Test + public void testOpenClose() throws Exception { + CoStreamFlatMap<String, Integer, String> operator = new CoStreamFlatMap<String, Integer, String>(new TestOpenCloseCoFlatMapFunction()); + + TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<String, Integer, String>(operator); + + long initialTime = 0L; + + testHarness.open(); + + testHarness.processElement1(new StreamRecord<String>("Hello", initialTime)); + testHarness.processElement2(new StreamRecord<Integer>(42, initialTime)); + + testHarness.close(); + + Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseCoFlatMapFunction.closeCalled); + Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0); + } + + // This must only be used in one test, otherwise the static fields will be changed + // by several tests concurrently + private static class TestOpenCloseCoFlatMapFunction extends RichCoFlatMapFunction<String, Integer, String> { + private static final long serialVersionUID = 1L; + + public static boolean openCalled = false; + public static boolean closeCalled = false; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + if (closeCalled) { + Assert.fail("Close called before open."); + } + openCalled = true; + } + + @Override + public void close() throws Exception { + super.close(); + if (!openCalled) { + Assert.fail("Open was not called before close."); + } + closeCalled = true; + } + + @Override + public void flatMap1(String value, Collector<String> out) throws Exception { + if (!openCalled) { + Assert.fail("Open was not called before run."); + } + out.collect(value); + } + + @Override + public void flatMap2(Integer value, Collector<String> out) throws Exception { + if (!openCalled) { + Assert.fail("Open was not called before run."); + } + out.collect(value.toString()); + } + + } + + @SuppressWarnings("unchecked") + @Test + public void multipleInputTest() { + LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); + + DataStream<Integer> ds1 = env.fromElements(1, 3, 5); + DataStream<Integer> ds2 = env.fromElements(2, 4).union(ds1); + + try { + ds1.forward().union(ds2); + fail(); + } catch (RuntimeException e) { + // expected + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java new file mode 100644 index 0000000..dcf4972 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.co; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; +import org.apache.flink.streaming.api.functions.co.CoMapFunction; +import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; +import org.apache.flink.streaming.api.functions.co.RichCoMapFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; +import org.junit.Assert; +import org.junit.Test; + +import java.io.Serializable; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.fail; + +/** + * Tests for {@link org.apache.flink.streaming.api.operators.co.CoStreamMap}. These test that: + * + * <ul> + * <li>RichFunction methods are called correctly</li> + * <li>Timestamps of processed elements match the input timestamp</li> + * <li>Watermarks are correctly forwarded</li> + * </ul> + */ +public class CoStreamMapTest implements Serializable { + private static final long serialVersionUID = 1L; + + private final static class MyCoMap implements CoMapFunction<Double, Integer, String> { + private static final long serialVersionUID = 1L; + + @Override + public String map1(Double value) { + return value.toString(); + } + + @Override + public String map2(Integer value) { + return value.toString(); + } + } + + + @Test + public void testCoMap() throws Exception { + CoStreamMap<Double, Integer, String> operator = new CoStreamMap<Double, Integer, String>(new MyCoMap()); + + TwoInputStreamOperatorTestHarness<Double, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<Double, Integer, String>(operator); + + long initialTime = 0L; + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue(); + + testHarness.open(); + + testHarness.processElement1(new StreamRecord<Double>(1.1d, initialTime + 1)); + testHarness.processElement1(new StreamRecord<Double>(1.2d, initialTime + 2)); + testHarness.processElement1(new StreamRecord<Double>(1.3d, initialTime + 3)); + testHarness.processWatermark1(new Watermark(initialTime + 3)); + testHarness.processElement1(new StreamRecord<Double>(1.4d, initialTime + 4)); + testHarness.processElement1(new StreamRecord<Double>(1.5d, initialTime + 5)); + + testHarness.processElement2(new StreamRecord<Integer>(1, initialTime + 1)); + testHarness.processElement2(new StreamRecord<Integer>(2, initialTime + 2)); + testHarness.processWatermark2(new Watermark(initialTime + 2)); + testHarness.processElement2(new StreamRecord<Integer>(3, initialTime + 3)); + testHarness.processElement2(new StreamRecord<Integer>(4, initialTime + 4)); + testHarness.processElement2(new StreamRecord<Integer>(5, initialTime + 5)); + + expectedOutput.add(new StreamRecord<String>("1.1", initialTime + 1)); + expectedOutput.add(new StreamRecord<String>("1.2", initialTime + 2)); + expectedOutput.add(new StreamRecord<String>("1.3", initialTime + 3)); + expectedOutput.add(new StreamRecord<String>("1.4", initialTime + 4)); + expectedOutput.add(new StreamRecord<String>("1.5", initialTime + 5)); + + expectedOutput.add(new StreamRecord<String>("1", initialTime + 1)); + expectedOutput.add(new StreamRecord<String>("2", initialTime + 2)); + expectedOutput.add(new Watermark(initialTime + 2)); + expectedOutput.add(new StreamRecord<String>("3", initialTime + 3)); + expectedOutput.add(new StreamRecord<String>("4", initialTime + 4)); + expectedOutput.add(new StreamRecord<String>("5", initialTime + 5)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + } + + @Test + public void testOpenClose() throws Exception { + CoStreamMap<Double, Integer, String> operator = new CoStreamMap<Double, Integer, String>(new TestOpenCloseCoMapFunction()); + + TwoInputStreamOperatorTestHarness<Double, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<Double, Integer, String>(operator); + + long initialTime = 0L; + + testHarness.open(); + + testHarness.processElement1(new StreamRecord<Double>(74d, initialTime)); + testHarness.processElement2(new StreamRecord<Integer>(42, initialTime)); + + testHarness.close(); + + Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseCoMapFunction.closeCalled); + Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0); + } + + // This must only be used in one test, otherwise the static fields will be changed + // by several tests concurrently + private static class TestOpenCloseCoMapFunction extends RichCoMapFunction<Double, Integer, String> { + private static final long serialVersionUID = 1L; + + public static boolean openCalled = false; + public static boolean closeCalled = false; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + if (closeCalled) { + Assert.fail("Close called before open."); + } + openCalled = true; + } + + @Override + public void close() throws Exception { + super.close(); + if (!openCalled) { + Assert.fail("Open was not called before close."); + } + closeCalled = true; + } + + @Override + public String map1(Double value) throws Exception { + if (!openCalled) { + Assert.fail("Open was not called before run."); + } + return value.toString(); + } + + @Override + public String map2(Integer value) throws Exception { + if (!openCalled) { + Assert.fail("Open was not called before run."); + } + return value.toString(); + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java index c0f49c7..130842e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java @@ -1,182 +1,182 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators.co; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.functions.co.CoWindowFunction; -import org.apache.flink.streaming.api.operators.co.CoStreamWindow; -import org.apache.flink.streaming.api.windowing.helper.Timestamp; -import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; -import org.apache.flink.streaming.util.MockCoContext; -import org.apache.flink.util.Collector; -import org.junit.Test; - -public class CoWindowTest { - - public static final class MyCoGroup1 implements CoWindowFunction<Integer, Integer, Integer> { - - private static final long serialVersionUID = 1L; - - @SuppressWarnings("unused") - @Override - public void coWindow(List<Integer> first, List<Integer> second, Collector<Integer> out) - throws Exception { - Integer count1 = 0; - for (Integer i : first) { - count1++; - } - Integer count2 = 0; - for (Integer i : second) { - count2++; - } - out.collect(count1); - out.collect(count2); - - } - - } - - public static final class MyCoGroup2 implements - CoWindowFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> { - - private static final long serialVersionUID = 1L; - - @Override - public void coWindow(List<Tuple2<Integer, Integer>> first, - List<Tuple2<Integer, Integer>> second, Collector<Integer> out) throws Exception { - - Set<Integer> firstElements = new HashSet<Integer>(); - for (Tuple2<Integer, Integer> value : first) { - firstElements.add(value.f1); - } - for (Tuple2<Integer, Integer> value : second) { - if (firstElements.contains(value.f1)) { - out.collect(value.f1); - } - } - - } - - } - - private static final class MyTS1 implements Timestamp<Integer> { - - private static final long serialVersionUID = 1L; - - @Override - public long getTimestamp(Integer value) { - return value; - } - - } - - private static final class MyTS2 implements Timestamp<Tuple2<Integer, Integer>> { - - private static final long serialVersionUID = 1L; - - @Override - public long getTimestamp(Tuple2<Integer, Integer> value) { - return value.f0; - } - - } - - @Test - public void coWindowGroupReduceTest2() throws Exception { - - CoStreamWindow<Integer, Integer, Integer> invokable1 = new CoStreamWindow<Integer, Integer, Integer>( - new MyCoGroup1(), 2, 1, new TimestampWrapper<Integer>(new MyTS1(), 1), - new TimestampWrapper<Integer>(new MyTS1(), 1)); - - // Windowsize 2, slide 1 - // 1,2|2,3|3,4|4,5 - - List<Integer> input11 = new ArrayList<Integer>(); - input11.add(1); - input11.add(1); - input11.add(2); - input11.add(3); - input11.add(3); - - List<Integer> input12 = new ArrayList<Integer>(); - input12.add(1); - input12.add(2); - input12.add(3); - input12.add(3); - input12.add(5); - - // Windows: (1,1,2)(1,1,2)|(2,3,3)(2,3,3)|(3,3)(3,3)|(5)(5) - // expected output: 3,2|3,3|2,2|0,1 - - List<Integer> expected1 = new ArrayList<Integer>(); - expected1.add(3); - expected1.add(2); - expected1.add(3); - expected1.add(3); - expected1.add(2); - expected1.add(2); - expected1.add(0); - expected1.add(1); - - List<Integer> actual1 = MockCoContext.createAndExecute(invokable1, input11, input12); - assertEquals(expected1, actual1); - - CoStreamWindow<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoStreamWindow<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>( - new MyCoGroup2(), 2, 3, new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), - 1), new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), 1)); - - // WindowSize 2, slide 3 - // 1,2|4,5|7,8| - - List<Tuple2<Integer, Integer>> input21 = new ArrayList<Tuple2<Integer, Integer>>(); - input21.add(new Tuple2<Integer, Integer>(1, 1)); - input21.add(new Tuple2<Integer, Integer>(1, 2)); - input21.add(new Tuple2<Integer, Integer>(2, 3)); - input21.add(new Tuple2<Integer, Integer>(3, 4)); - input21.add(new Tuple2<Integer, Integer>(3, 5)); - input21.add(new Tuple2<Integer, Integer>(4, 6)); - input21.add(new Tuple2<Integer, Integer>(4, 7)); - input21.add(new Tuple2<Integer, Integer>(5, 8)); - - List<Tuple2<Integer, Integer>> input22 = new ArrayList<Tuple2<Integer, Integer>>(); - input22.add(new Tuple2<Integer, Integer>(1, 1)); - input22.add(new Tuple2<Integer, Integer>(2, 0)); - input22.add(new Tuple2<Integer, Integer>(2, 2)); - input22.add(new Tuple2<Integer, Integer>(3, 9)); - input22.add(new Tuple2<Integer, Integer>(3, 4)); - input22.add(new Tuple2<Integer, Integer>(4, 10)); - input22.add(new Tuple2<Integer, Integer>(5, 8)); - input22.add(new Tuple2<Integer, Integer>(5, 7)); - - List<Integer> expected2 = new ArrayList<Integer>(); - expected2.add(1); - expected2.add(2); - expected2.add(8); - expected2.add(7); - - List<Integer> actual2 = MockCoContext.createAndExecute(invokable2, input21, input22); - assertEquals(expected2, actual2); - } -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package org.apache.flink.streaming.api.operators.co; +// +//import static org.junit.Assert.assertEquals; +// +//import java.util.ArrayList; +//import java.util.HashSet; +//import java.util.List; +//import java.util.Set; +// +//import org.apache.flink.api.java.tuple.Tuple2; +//import org.apache.flink.streaming.api.functions.co.CoWindowFunction; +//import org.apache.flink.streaming.api.operators.co.CoStreamWindow; +//import org.apache.flink.streaming.api.windowing.helper.Timestamp; +//import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; +//import org.apache.flink.streaming.util.MockCoContext; +//import org.apache.flink.util.Collector; +//import org.junit.Test; +// +//public class CoWindowTest { +// +// public static final class MyCoGroup1 implements CoWindowFunction<Integer, Integer, Integer> { +// +// private static final long serialVersionUID = 1L; +// +// @SuppressWarnings("unused") +// @Override +// public void coWindow(List<Integer> first, List<Integer> second, Collector<Integer> out) +// throws Exception { +// Integer count1 = 0; +// for (Integer i : first) { +// count1++; +// } +// Integer count2 = 0; +// for (Integer i : second) { +// count2++; +// } +// out.collect(count1); +// out.collect(count2); +// +// } +// +// } +// +// public static final class MyCoGroup2 implements +// CoWindowFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> { +// +// private static final long serialVersionUID = 1L; +// +// @Override +// public void coWindow(List<Tuple2<Integer, Integer>> first, +// List<Tuple2<Integer, Integer>> second, Collector<Integer> out) throws Exception { +// +// Set<Integer> firstElements = new HashSet<Integer>(); +// for (Tuple2<Integer, Integer> value : first) { +// firstElements.add(value.f1); +// } +// for (Tuple2<Integer, Integer> value : second) { +// if (firstElements.contains(value.f1)) { +// out.collect(value.f1); +// } +// } +// +// } +// +// } +// +// private static final class MyTS1 implements Timestamp<Integer> { +// +// private static final long serialVersionUID = 1L; +// +// @Override +// public long getTimestamp(Integer value) { +// return value; +// } +// +// } +// +// private static final class MyTS2 implements Timestamp<Tuple2<Integer, Integer>> { +// +// private static final long serialVersionUID = 1L; +// +// @Override +// public long getTimestamp(Tuple2<Integer, Integer> value) { +// return value.f0; +// } +// +// } +// +// @Test +// public void coWindowGroupReduceTest2() throws Exception { +// +// CoStreamWindow<Integer, Integer, Integer> invokable1 = new CoStreamWindow<Integer, Integer, Integer>( +// new MyCoGroup1(), 2, 1, new TimestampWrapper<Integer>(new MyTS1(), 1), +// new TimestampWrapper<Integer>(new MyTS1(), 1)); +// +// // Windowsize 2, slide 1 +// // 1,2|2,3|3,4|4,5 +// +// List<Integer> input11 = new ArrayList<Integer>(); +// input11.add(1); +// input11.add(1); +// input11.add(2); +// input11.add(3); +// input11.add(3); +// +// List<Integer> input12 = new ArrayList<Integer>(); +// input12.add(1); +// input12.add(2); +// input12.add(3); +// input12.add(3); +// input12.add(5); +// +// // Windows: (1,1,2)(1,1,2)|(2,3,3)(2,3,3)|(3,3)(3,3)|(5)(5) +// // expected output: 3,2|3,3|2,2|0,1 +// +// List<Integer> expected1 = new ArrayList<Integer>(); +// expected1.add(3); +// expected1.add(2); +// expected1.add(3); +// expected1.add(3); +// expected1.add(2); +// expected1.add(2); +// expected1.add(0); +// expected1.add(1); +// +// List<Integer> actual1 = MockCoContext.createAndExecute(invokable1, input11, input12); +// assertEquals(expected1, actual1); +// +// CoStreamWindow<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoStreamWindow<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>( +// new MyCoGroup2(), 2, 3, new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), +// 1), new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), 1)); +// +// // WindowSize 2, slide 3 +// // 1,2|4,5|7,8| +// +// List<Tuple2<Integer, Integer>> input21 = new ArrayList<Tuple2<Integer, Integer>>(); +// input21.add(new Tuple2<Integer, Integer>(1, 1)); +// input21.add(new Tuple2<Integer, Integer>(1, 2)); +// input21.add(new Tuple2<Integer, Integer>(2, 3)); +// input21.add(new Tuple2<Integer, Integer>(3, 4)); +// input21.add(new Tuple2<Integer, Integer>(3, 5)); +// input21.add(new Tuple2<Integer, Integer>(4, 6)); +// input21.add(new Tuple2<Integer, Integer>(4, 7)); +// input21.add(new Tuple2<Integer, Integer>(5, 8)); +// +// List<Tuple2<Integer, Integer>> input22 = new ArrayList<Tuple2<Integer, Integer>>(); +// input22.add(new Tuple2<Integer, Integer>(1, 1)); +// input22.add(new Tuple2<Integer, Integer>(2, 0)); +// input22.add(new Tuple2<Integer, Integer>(2, 2)); +// input22.add(new Tuple2<Integer, Integer>(3, 9)); +// input22.add(new Tuple2<Integer, Integer>(3, 4)); +// input22.add(new Tuple2<Integer, Integer>(4, 10)); +// input22.add(new Tuple2<Integer, Integer>(5, 8)); +// input22.add(new Tuple2<Integer, Integer>(5, 7)); +// +// List<Integer> expected2 = new ArrayList<Integer>(); +// expected2.add(1); +// expected2.add(2); +// expected2.add(8); +// expected2.add(7); +// +// List<Integer> actual2 = MockCoContext.createAndExecute(invokable2, input21, input22); +// assertEquals(expected2, actual2); +// } +//} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java index c8b0ae3..f111890 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java @@ -25,10 +25,9 @@ import java.util.List; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.operators.windowing.ParallelGroupedMerge; -import org.apache.flink.streaming.api.operators.windowing.ParallelMerge; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.windowing.StreamWindow; -import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector; +import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput; import org.junit.Test; public class ParallelMergeTest { @@ -45,37 +44,38 @@ public class ParallelMergeTest { } }; - TestCollector<StreamWindow<Integer>> out = new TestCollector<StreamWindow<Integer>>(); - List<StreamWindow<Integer>> output = out.getCollected(); + TestOutput<StreamWindow<Integer>> output = new TestOutput<StreamWindow<Integer>>(); + TimestampedCollector<StreamWindow<Integer>> collector = new TimestampedCollector<StreamWindow<Integer>>(output); + List<StreamWindow<Integer>> result = output.getCollected(); ParallelMerge<Integer> merger = new ParallelMerge<Integer>(reducer); merger.numberOfDiscretizers = 2; - merger.flatMap1(createTestWindow(1), out); - merger.flatMap1(createTestWindow(1), out); - merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out); - assertTrue(output.isEmpty()); - merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out); - assertEquals(StreamWindow.fromElements(2), output.get(0)); - - merger.flatMap2(new Tuple2<Integer, Integer>(2, 2), out); - merger.flatMap1(createTestWindow(2), out); - merger.flatMap1(createTestWindow(2), out); - merger.flatMap2(new Tuple2<Integer, Integer>(2, 1), out); - assertEquals(1, output.size()); - merger.flatMap1(createTestWindow(2), out); - assertEquals(StreamWindow.fromElements(3), output.get(1)); + merger.flatMap1(createTestWindow(1), collector); + merger.flatMap1(createTestWindow(1), collector); + merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector); + assertTrue(result.isEmpty()); + merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector); + assertEquals(StreamWindow.fromElements(2), result.get(0)); + + merger.flatMap2(new Tuple2<Integer, Integer>(2, 2), collector); + merger.flatMap1(createTestWindow(2), collector); + merger.flatMap1(createTestWindow(2), collector); + merger.flatMap2(new Tuple2<Integer, Integer>(2, 1), collector); + assertEquals(1, result.size()); + merger.flatMap1(createTestWindow(2), collector); + assertEquals(StreamWindow.fromElements(3), result.get(1)); // check error handling - merger.flatMap1(createTestWindow(3), out); - merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), out); - merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), out); + merger.flatMap1(createTestWindow(3), collector); + merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), collector); + merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), collector); - merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), out); - merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), out); - merger.flatMap1(createTestWindow(4), out); + merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), collector); + merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), collector); + merger.flatMap1(createTestWindow(4), collector); try { - merger.flatMap1(createTestWindow(4), out); + merger.flatMap1(createTestWindow(4), collector); fail(); } catch (RuntimeException e) { // Do nothing @@ -83,12 +83,12 @@ public class ParallelMergeTest { ParallelMerge<Integer> merger2 = new ParallelMerge<Integer>(reducer); merger2.numberOfDiscretizers = 2; - merger2.flatMap1(createTestWindow(0), out); - merger2.flatMap1(createTestWindow(1), out); - merger2.flatMap1(createTestWindow(1), out); - merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), out); + merger2.flatMap1(createTestWindow(0), collector); + merger2.flatMap1(createTestWindow(1), collector); + merger2.flatMap1(createTestWindow(1), collector); + merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector); try { - merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), out); + merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector); fail(); } catch (RuntimeException e) { // Do nothing @@ -99,18 +99,19 @@ public class ParallelMergeTest { @Test public void groupedTest() throws Exception { - TestCollector<StreamWindow<Integer>> out = new TestCollector<StreamWindow<Integer>>(); - List<StreamWindow<Integer>> output = out.getCollected(); + TestOutput<StreamWindow<Integer>> output = new TestOutput<StreamWindow<Integer>>(); + TimestampedCollector<StreamWindow<Integer>> collector = new TimestampedCollector<StreamWindow<Integer>>(output); + List<StreamWindow<Integer>> result = output.getCollected(); ParallelMerge<Integer> merger = new ParallelGroupedMerge<Integer>(); merger.numberOfDiscretizers = 2; - merger.flatMap1(createTestWindow(1), out); - merger.flatMap1(createTestWindow(1), out); - merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out); - assertTrue(output.isEmpty()); - merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out); - assertEquals(StreamWindow.fromElements(1, 1), output.get(0)); + merger.flatMap1(createTestWindow(1), collector); + merger.flatMap1(createTestWindow(1), collector); + merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector); + assertTrue(result.isEmpty()); + merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector); + assertEquals(StreamWindow.fromElements(1, 1), result.get(0)); } private StreamWindow<Integer> createTestWindow(Integer id) {