http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java index e06e0ef..403dd17 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java @@ -24,7 +24,7 @@ import java.util.Arrays; import java.util.List; import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.streaming.util.MockInvokable; +import org.apache.flink.streaming.util.MockContext; import org.junit.Test; public class FilterTest implements Serializable { @@ -44,7 +44,7 @@ public class FilterTest implements Serializable { FilterInvokable<Integer> invokable = new FilterInvokable<Integer>(new MyFilter()); List<Integer> expected = Arrays.asList(2, 4, 6); - List<Integer> actual = MockInvokable.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7)); + List<Integer> actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7)); assertEquals(expected, actual); }
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java index a89de50..7424e21 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java @@ -23,7 +23,7 @@ import java.util.Arrays; import java.util.List; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.streaming.util.MockInvokable; +import org.apache.flink.streaming.util.MockContext; import org.apache.flink.util.Collector; import org.junit.Test; @@ -47,7 +47,7 @@ public class FlatMapTest { FlatMapInvokable<Integer, Integer> invokable = new FlatMapInvokable<Integer, Integer>(new MyFlatMap()); List<Integer> expected = Arrays.asList(2, 4, 4, 16, 6, 36, 8, 64); - List<Integer> actual = MockInvokable.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8)); + List<Integer> actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8)); assertEquals(expected, actual); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java index 0b68207..ceaccf3 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java @@ -23,7 +23,7 @@ import java.util.Arrays; import java.util.List; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.streaming.util.MockInvokable; +import org.apache.flink.streaming.util.MockContext; import org.apache.flink.streaming.util.keys.ObjectKeySelector; import org.junit.Test; @@ -46,7 +46,7 @@ public class GroupedReduceInvokableTest { new MyReducer(), new ObjectKeySelector<Integer>()); List<Integer> expected = Arrays.asList(1, 2, 2, 4, 3); - List<Integer> actual = MockInvokable.createAndExecute(invokable1, + List<Integer> actual = MockContext.createAndExecute(invokable1, Arrays.asList(1, 1, 2, 2, 3)); assertEquals(expected, actual); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java index 5ac5529..c3a48d5 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java @@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy; -import org.apache.flink.streaming.util.MockInvokable; +import org.apache.flink.streaming.util.MockContext; import org.apache.flink.streaming.util.keys.TupleKeySelector; import org.junit.Test; @@ -206,7 +206,7 @@ public class GroupedWindowInvokableTest { GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer, Integer>( reduceFunction, keySelector, triggers, evictions, centralTriggers, null); - List<Integer> result = MockInvokable.createAndExecute(invokable, inputs); + List<Integer> result = MockContext.createAndExecute(invokable, inputs); List<Integer> actual = new LinkedList<Integer>(); for (Integer current : result) { @@ -225,7 +225,7 @@ public class GroupedWindowInvokableTest { invokable = new GroupedWindowInvokable<Integer, Integer>( reduceFunction, keySelector, triggers, null, centralTriggers,centralEvictions); - result = MockInvokable.createAndExecute(invokable, inputs); + result = MockContext.createAndExecute(invokable, inputs); actual = new LinkedList<Integer>(); for (Integer current : result) { actual.add(current); @@ -282,7 +282,7 @@ public class GroupedWindowInvokableTest { }, new TupleKeySelector<Tuple2<Integer, String>>(1), triggers, evictions, centralTriggers, null); - List<Tuple2<Integer, String>> result = MockInvokable.createAndExecute(invokable2, inputs2); + List<Tuple2<Integer, String>> result = MockContext.createAndExecute(invokable2, inputs2); List<Tuple2<Integer, String>> actual2 = new LinkedList<Tuple2<Integer, String>>(); for (Tuple2<Integer, String> current : result) { @@ -391,7 +391,7 @@ public class GroupedWindowInvokableTest { distributedTriggers, evictions, triggers, null); ArrayList<Tuple2<Integer, String>> result = new ArrayList<Tuple2<Integer, String>>(); - for (Tuple2<Integer, String> t : MockInvokable.createAndExecute(invokable, inputs)) { + for (Tuple2<Integer, String> t : MockContext.createAndExecute(invokable, inputs)) { result.add(t); } @@ -411,7 +411,7 @@ public class GroupedWindowInvokableTest { distributedTriggers, evictions, triggers, centralEvictions); result = new ArrayList<Tuple2<Integer, String>>(); - for (Tuple2<Integer, String> t : MockInvokable.createAndExecute(invokable, inputs)) { + for (Tuple2<Integer, String> t : MockContext.createAndExecute(invokable, inputs)) { result.add(t); } @@ -480,7 +480,7 @@ public class GroupedWindowInvokableTest { }, distributedTriggers, evictions, triggers, null); ArrayList<Integer> result = new ArrayList<Integer>(); - for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) { + for (Integer t : MockContext.createAndExecute(invokable, inputs)) { result.add(t); } @@ -556,7 +556,7 @@ public class GroupedWindowInvokableTest { }, distributedTriggers, evictions, triggers, null); ArrayList<Integer> result = new ArrayList<Integer>(); - for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) { + for (Integer t : MockContext.createAndExecute(invokable, inputs)) { result.add(t); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java index 7124ff8..5390ec9 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java @@ -23,7 +23,7 @@ import java.util.Arrays; import java.util.List; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.streaming.util.MockInvokable; +import org.apache.flink.streaming.util.MockContext; import org.junit.Test; public class MapTest { @@ -42,7 +42,7 @@ public class MapTest { MapInvokable<Integer, String> invokable = new MapInvokable<Integer, String>(new Map()); List<String> expectedList = Arrays.asList("+2", "+3", "+4"); - List<String> actualList = MockInvokable.createAndExecute(invokable, Arrays.asList(1, 2, 3)); + List<String> actualList = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3)); assertEquals(expectedList, actualList); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java index 288d4ee..11c44cd 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java @@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.datastream.StreamProjection; -import org.apache.flink.streaming.util.MockInvokable; +import org.apache.flink.streaming.util.MockContext; import org.junit.Test; public class ProjectTest implements Serializable { @@ -62,6 +62,6 @@ public class ProjectTest implements Serializable { expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c")); expected.add(new Tuple3<Integer, Integer, String>(7, 7, "a")); - assertEquals(expected, MockInvokable.createAndExecute(invokable, input)); + assertEquals(expected, MockContext.createAndExecute(invokable, input)); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java index 68b8f8e..ae866e6 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java @@ -23,7 +23,7 @@ import java.util.Arrays; import java.util.List; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.streaming.util.MockInvokable; +import org.apache.flink.streaming.util.MockContext; import org.junit.Test; public class StreamReduceTest { @@ -45,7 +45,7 @@ public class StreamReduceTest { new MyReducer()); List<Integer> expected = Arrays.asList(1,2,4,7,10); - List<Integer> actual = MockInvokable.createAndExecute(invokable1, + List<Integer> actual = MockContext.createAndExecute(invokable1, Arrays.asList(1, 1, 2, 3, 3)); assertEquals(expected, actual); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java index 612da84..421a999 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java @@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; -import org.apache.flink.streaming.util.MockInvokable; +import org.apache.flink.streaming.util.MockContext; import org.junit.Test; public class WindowInvokableTest { @@ -98,7 +98,7 @@ public class WindowInvokableTest { myReduceFunction, triggers, evictions); ArrayList<Integer> result = new ArrayList<Integer>(); - for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) { + for (Integer t : MockContext.createAndExecute(invokable, inputs)) { result.add(t); } @@ -148,7 +148,7 @@ public class WindowInvokableTest { expected.add(24); expected.add(19); List<Integer> result = new ArrayList<Integer>(); - for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) { + for (Integer t : MockContext.createAndExecute(invokable, inputs)) { result.add(t); } assertEquals(expected, result); @@ -199,7 +199,7 @@ public class WindowInvokableTest { expected2.add(-4); result = new ArrayList<Integer>(); - for (Integer t : MockInvokable.createAndExecute(invokable2, inputs2)) { + for (Integer t : MockContext.createAndExecute(invokable2, inputs2)) { result.add(t); } @@ -253,7 +253,7 @@ public class WindowInvokableTest { myReduceFunction, triggers, evictions); ArrayList<Integer> result = new ArrayList<Integer>(); - for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) { + for (Integer t : MockContext.createAndExecute(invokable, inputs)) { result.add(t); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java new file mode 100644 index 0000000..ea94f98 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.StreamConfig; +import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; +import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; +import org.apache.flink.streaming.api.streamvertex.StreamTaskContext; +import org.apache.flink.streaming.io.CoReaderIterator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> { + // private Collection<IN1> input1; + // private Collection<IN2> input2; + private Iterator<IN1> inputIterator1; + private Iterator<IN2> inputIterator2; + private List<OUT> outputs; + + private Collector<OUT> collector; + private StreamRecordSerializer<IN1> inDeserializer1; + private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> mockIterator; + private StreamRecordSerializer<IN2> inDeserializer2; + + public MockCoContext(Collection<IN1> input1, Collection<IN2> input2) { + + if (input1.isEmpty() || input2.isEmpty()) { + throw new RuntimeException("Inputs must not be empty"); + } + + this.inputIterator1 = input1.iterator(); + this.inputIterator2 = input2.iterator(); + + TypeInformation<IN1> inTypeInfo1 = TypeExtractor.getForObject(input1.iterator().next()); + inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1); + TypeInformation<IN2> inTypeInfo2 = TypeExtractor.getForObject(input2.iterator().next()); + inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2); + + mockIterator = new MockCoReaderIterator(inDeserializer1, inDeserializer2); + + outputs = new ArrayList<OUT>(); + collector = new MockCollector<OUT>(outputs); + } + + private int currentInput = 1; + private StreamRecord<IN1> reuse1; + private StreamRecord<IN2> reuse2; + + private class MockCoReaderIterator extends + CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> { + + public MockCoReaderIterator(TypeSerializer<StreamRecord<IN1>> serializer1, + TypeSerializer<StreamRecord<IN2>> serializer2) { + super(null, serializer1, serializer2); + reuse1 = inDeserializer1.createInstance(); + reuse2 = inDeserializer2.createInstance(); + } + + @Override + public int next(StreamRecord<IN1> target1, StreamRecord<IN2> target2) throws IOException { + this.delegate1.setInstance(target1); + this.delegate2.setInstance(target2); + + int inputNumber = nextRecord(); + target1.setObject(reuse1.getObject()); + target2.setObject(reuse2.getObject()); + + return inputNumber; + } + } + + private Integer nextRecord() { + if (inputIterator1.hasNext() && inputIterator2.hasNext()) { + switch (currentInput) { + case 1: + return next1(); + case 2: + return next2(); + default: + return 0; + } + } + + if (inputIterator1.hasNext()) { + return next1(); + } + + if (inputIterator2.hasNext()) { + return next2(); + } + + return 0; + } + + private int next1() { + reuse1 = inDeserializer1.createInstance(); + reuse1.setObject(inputIterator1.next()); + currentInput = 2; + return 1; + } + + private int next2() { + reuse2 = inDeserializer2.createInstance(); + reuse2.setObject(inputIterator2.next()); + currentInput = 1; + return 2; + } + + public List<OUT> getOutputs() { + return outputs; + } + + public Collector<OUT> getCollector() { + return collector; + } + + public StreamRecordSerializer<IN1> getInDeserializer1() { + return inDeserializer1; + } + + public StreamRecordSerializer<IN2> getInDeserializer2() { + return inDeserializer2; + } + + public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> getIterator() { + return mockIterator; + } + + public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1, IN2, OUT> invokable, + List<IN1> input1, List<IN2> input2) { + MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2); + invokable.setup(mockContext); + + try { + invokable.open(null); + invokable.invoke(); + invokable.close(); + } catch (Exception e) { + throw new RuntimeException("Cannot invoke invokable.", e); + } + + return mockContext.getOutputs(); + } + + @Override + public StreamConfig getConfig() { + return null; + } + + @Override + public ClassLoader getUserCodeClassLoader() { + return null; + } + + @SuppressWarnings("unchecked") + @Override + public <X> MutableObjectIterator<X> getInput(int index) { + switch (index) { + case 0: + return (MutableObjectIterator<X>) inputIterator1; + case 1: + return (MutableObjectIterator<X>) inputIterator2; + default: + throw new IllegalArgumentException("CoStreamVertex has only 2 inputs"); + } + } + + @SuppressWarnings("unchecked") + @Override + public <X> StreamRecordSerializer<X> getInputSerializer(int index) { + switch (index) { + case 0: + return (StreamRecordSerializer<X>) inDeserializer1; + case 1: + return (StreamRecordSerializer<X>) inDeserializer2; + default: + throw new IllegalArgumentException("CoStreamVertex has only 2 inputs"); + } + } + + @SuppressWarnings("unchecked") + @Override + public <X, Y> CoReaderIterator<X, Y> getCoReader() { + return (CoReaderIterator<X, Y>) mockIterator; + } + + @Override + public Collector<OUT> getOutputCollector() { + return collector; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java deleted file mode 100644 index 39d3ab4..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; -import org.apache.flink.streaming.io.CoReaderIterator; -import org.apache.flink.util.Collector; - -public class MockCoInvokable<IN1, IN2, OUT> { - // private Collection<IN1> input1; - // private Collection<IN2> input2; - private Iterator<IN1> inputIterator1; - private Iterator<IN2> inputIterator2; - private List<OUT> outputs; - - private Collector<OUT> collector; - private StreamRecordSerializer<IN1> inDeserializer1; - private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> mockIterator; - private StreamRecordSerializer<IN2> inDeserializer2; - - public MockCoInvokable(Collection<IN1> input1, Collection<IN2> input2) { - - if (input1.isEmpty() || input2.isEmpty()) { - throw new RuntimeException("Inputs must not be empty"); - } - - this.inputIterator1 = input1.iterator(); - this.inputIterator2 = input2.iterator(); - - TypeInformation<IN1> inTypeInfo1 = TypeExtractor.getForObject(input1.iterator().next()); - inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1); - TypeInformation<IN2> inTypeInfo2 = TypeExtractor.getForObject(input2.iterator().next()); - inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2); - - mockIterator = new MockCoReaderIterator(inDeserializer1, inDeserializer2); - - outputs = new ArrayList<OUT>(); - collector = new MockCollector<OUT>(outputs); - } - - private int currentInput = 1; - private StreamRecord<IN1> reuse1; - private StreamRecord<IN2> reuse2; - - private class MockCoReaderIterator extends - CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> { - - public MockCoReaderIterator( - TypeSerializer<StreamRecord<IN1>> serializer1, - TypeSerializer<StreamRecord<IN2>> serializer2) { - super(null, serializer1, serializer2); - reuse1 = inDeserializer1.createInstance(); - reuse2 = inDeserializer2.createInstance(); - } - - @Override - public int next(StreamRecord<IN1> target1, StreamRecord<IN2> target2) throws IOException { - this.delegate1.setInstance(target1); - this.delegate2.setInstance(target2); - - int inputNumber = nextRecord(); - target1.setObject(reuse1.getObject()); - target2.setObject(reuse2.getObject()); - - return inputNumber; - } - } - - private Integer nextRecord() { - if (inputIterator1.hasNext() && inputIterator2.hasNext()) { - switch (currentInput) { - case 1: - return next1(); - case 2: - return next2(); - default: - return 0; - } - } - - if (inputIterator1.hasNext()) { - return next1(); - } - - if (inputIterator2.hasNext()) { - return next2(); - } - - return 0; - } - - private int next1() { - reuse1 = inDeserializer1.createInstance(); - reuse1.setObject(inputIterator1.next()); - currentInput = 2; - return 1; - } - - private int next2() { - reuse2 = inDeserializer2.createInstance(); - reuse2.setObject(inputIterator2.next()); - currentInput = 1; - return 2; - } - - public List<OUT> getOutputs() { - return outputs; - } - - public Collector<OUT> getCollector() { - return collector; - } - - public StreamRecordSerializer<IN1> getInDeserializer1() { - return inDeserializer1; - } - - public StreamRecordSerializer<IN2> getInDeserializer2() { - return inDeserializer2; - } - - public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> getIterator() { - return mockIterator; - } - - public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1, IN2, OUT> invokable, - List<IN1> input1, List<IN2> input2) { - MockCoInvokable<IN1, IN2, OUT> mock = new MockCoInvokable<IN1, IN2, OUT>(input1, input2); - invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer1(), - mock.getInDeserializer2(), false); - - try { - invokable.open(null); - invokable.invoke(); - invokable.close(); - } catch (Exception e) { - throw new RuntimeException("Cannot invoke invokable.", e); - } - - return mock.getOutputs(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java new file mode 100644 index 0000000..87bedb2 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.StreamConfig; +import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; +import org.apache.flink.streaming.api.streamvertex.StreamTaskContext; +import org.apache.flink.streaming.io.CoReaderIterator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +public class MockContext<IN, OUT> implements StreamTaskContext<OUT> { + private Collection<IN> inputs; + private List<OUT> outputs; + + private Collector<OUT> collector; + private StreamRecordSerializer<IN> inDeserializer; + private MutableObjectIterator<StreamRecord<IN>> iterator; + + public MockContext(Collection<IN> inputs) { + this.inputs = inputs; + if (inputs.isEmpty()) { + throw new RuntimeException("Inputs must not be empty"); + } + + TypeInformation<IN> inTypeInfo = TypeExtractor.getForObject(inputs.iterator().next()); + inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo); + + iterator = new MockInputIterator(); + outputs = new ArrayList<OUT>(); + collector = new MockCollector<OUT>(outputs); + } + + private class MockInputIterator implements MutableObjectIterator<StreamRecord<IN>> { + Iterator<IN> listIterator; + + public MockInputIterator() { + listIterator = inputs.iterator(); + } + + @Override + public StreamRecord<IN> next(StreamRecord<IN> reuse) throws IOException { + if (listIterator.hasNext()) { + reuse.setObject(listIterator.next()); + } else { + reuse = null; + } + return reuse; + } + } + + public List<OUT> getOutputs() { + return outputs; + } + + public Collector<OUT> getCollector() { + return collector; + } + + public StreamRecordSerializer<IN> getInDeserializer() { + return inDeserializer; + } + + public MutableObjectIterator<StreamRecord<IN>> getIterator() { + return iterator; + } + + public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT> invokable, + List<IN> inputs) { + MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs); + invokable.setup(mockContext); + try { + invokable.open(null); + invokable.invoke(); + invokable.close(); + } catch (Exception e) { + throw new RuntimeException("Cannot invoke invokable.", e); + } + + return mockContext.getOutputs(); + } + + @Override + public StreamConfig getConfig() { + return null; + } + + @Override + public ClassLoader getUserCodeClassLoader() { + return null; + } + + @SuppressWarnings("unchecked") + @Override + public <X> MutableObjectIterator<X> getInput(int index) { + if (index == 0) { + return (MutableObjectIterator<X>) iterator; + } else { + throw new IllegalArgumentException("There is only 1 input"); + } + } + + @SuppressWarnings("unchecked") + @Override + public <X> StreamRecordSerializer<X> getInputSerializer(int index) { + if (index == 0) { + return (StreamRecordSerializer<X>) inDeserializer; + } else { + throw new IllegalArgumentException("There is only 1 input"); + } + } + + @Override + public Collector<OUT> getOutputCollector() { + return collector; + } + + @Override + public <X, Y> CoReaderIterator<X, Y> getCoReader() { + throw new IllegalArgumentException("CoReader not available"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java deleted file mode 100644 index c06f53a..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.invokable.StreamInvokable; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; -import org.apache.flink.util.Collector; -import org.apache.flink.util.MutableObjectIterator; - -public class MockInvokable<IN, OUT> { - private Collection<IN> inputs; - private List<OUT> outputs; - - private Collector<OUT> collector; - private StreamRecordSerializer<IN> inDeserializer; - private MutableObjectIterator<StreamRecord<IN>> iterator; - - public MockInvokable(Collection<IN> inputs) { - this.inputs = inputs; - if (inputs.isEmpty()) { - throw new RuntimeException("Inputs must not be empty"); - } - - TypeInformation<IN> inTypeInfo = TypeExtractor.getForObject(inputs.iterator().next()); - inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo); - - iterator = new MockInputIterator(); - outputs = new ArrayList<OUT>(); - collector = new MockCollector<OUT>(outputs); - } - - - private class MockInputIterator implements MutableObjectIterator<StreamRecord<IN>> { - Iterator<IN> listIterator; - - public MockInputIterator() { - listIterator = inputs.iterator(); - } - - @Override - public StreamRecord<IN> next(StreamRecord<IN> reuse) throws IOException { - if (listIterator.hasNext()) { - reuse.setObject(listIterator.next()); - } else { - reuse = null; - } - return reuse; - } - } - - public List<OUT> getOutputs() { - return outputs; - } - - public Collector<OUT> getCollector() { - return collector; - } - - public StreamRecordSerializer<IN> getInDeserializer() { - return inDeserializer; - } - - public MutableObjectIterator<StreamRecord<IN>> getIterator() { - return iterator; - } - - public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT> invokable, List<IN> inputs) { - MockInvokable<IN, OUT> mock = new MockInvokable<IN, OUT>(inputs); - invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer(), false); - try { - invokable.open(null); - invokable.invoke(); - invokable.close(); - } catch (Exception e) { - throw new RuntimeException("Cannot invoke invokable.", e); - } - - return mock.getOutputs(); - } - -}
