http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
index e06e0ef..403dd17 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.junit.Test;
 
 public class FilterTest implements Serializable {
@@ -44,7 +44,7 @@ public class FilterTest implements Serializable {
                FilterInvokable<Integer> invokable = new 
FilterInvokable<Integer>(new MyFilter());
 
                List<Integer> expected = Arrays.asList(2, 4, 6);
-               List<Integer> actual = 
MockInvokable.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7));
+               List<Integer> actual = MockContext.createAndExecute(invokable, 
Arrays.asList(1, 2, 3, 4, 5, 6, 7));
                
                assertEquals(expected, actual);
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
index a89de50..7424e21 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -47,7 +47,7 @@ public class FlatMapTest {
                FlatMapInvokable<Integer, Integer> invokable = new 
FlatMapInvokable<Integer, Integer>(new MyFlatMap());
                
                List<Integer> expected = Arrays.asList(2, 4, 4, 16, 6, 36, 8, 
64);
-               List<Integer> actual = 
MockInvokable.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 
8));
+               List<Integer> actual = MockContext.createAndExecute(invokable, 
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
                
                assertEquals(expected, actual);
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
index 0b68207..ceaccf3 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.apache.flink.streaming.util.keys.ObjectKeySelector;
 import org.junit.Test;
 
@@ -46,7 +46,7 @@ public class GroupedReduceInvokableTest {
                                new MyReducer(), new 
ObjectKeySelector<Integer>());
 
                List<Integer> expected = Arrays.asList(1, 2, 2, 4, 3);
-               List<Integer> actual = 
MockInvokable.createAndExecute(invokable1,
+               List<Integer> actual = MockContext.createAndExecute(invokable1,
                                Arrays.asList(1, 1, 2, 2, 3));
 
                assertEquals(expected, actual);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
index 5ac5529..c3a48d5 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
@@ -39,7 +39,7 @@ import 
org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
@@ -206,7 +206,7 @@ public class GroupedWindowInvokableTest {
                GroupedWindowInvokable<Integer, Integer> invokable = new 
GroupedWindowInvokable<Integer, Integer>(
                                reduceFunction, keySelector, triggers, 
evictions, centralTriggers, null);
 
-               List<Integer> result = 
MockInvokable.createAndExecute(invokable, inputs);
+               List<Integer> result = MockContext.createAndExecute(invokable, 
inputs);
 
                List<Integer> actual = new LinkedList<Integer>();
                for (Integer current : result) {
@@ -225,7 +225,7 @@ public class GroupedWindowInvokableTest {
                invokable = new GroupedWindowInvokable<Integer, Integer>(
                                reduceFunction, keySelector, triggers, null, 
centralTriggers,centralEvictions);
                
-               result = MockInvokable.createAndExecute(invokable, inputs);
+               result = MockContext.createAndExecute(invokable, inputs);
                actual = new LinkedList<Integer>();
                for (Integer current : result) {
                        actual.add(current);
@@ -282,7 +282,7 @@ public class GroupedWindowInvokableTest {
                                }, new TupleKeySelector<Tuple2<Integer, 
String>>(1), triggers, evictions,
                                centralTriggers, null);
 
-               List<Tuple2<Integer, String>> result = 
MockInvokable.createAndExecute(invokable2, inputs2);
+               List<Tuple2<Integer, String>> result = 
MockContext.createAndExecute(invokable2, inputs2);
 
                List<Tuple2<Integer, String>> actual2 = new 
LinkedList<Tuple2<Integer, String>>();
                for (Tuple2<Integer, String> current : result) {
@@ -391,7 +391,7 @@ public class GroupedWindowInvokableTest {
                                distributedTriggers, evictions, triggers, null);
 
                ArrayList<Tuple2<Integer, String>> result = new 
ArrayList<Tuple2<Integer, String>>();
-               for (Tuple2<Integer, String> t : 
MockInvokable.createAndExecute(invokable, inputs)) {
+               for (Tuple2<Integer, String> t : 
MockContext.createAndExecute(invokable, inputs)) {
                        result.add(t);
                }
 
@@ -411,7 +411,7 @@ public class GroupedWindowInvokableTest {
                                distributedTriggers, evictions, triggers, 
centralEvictions);
 
                result = new ArrayList<Tuple2<Integer, String>>();
-               for (Tuple2<Integer, String> t : 
MockInvokable.createAndExecute(invokable, inputs)) {
+               for (Tuple2<Integer, String> t : 
MockContext.createAndExecute(invokable, inputs)) {
                        result.add(t);
                }
 
@@ -480,7 +480,7 @@ public class GroupedWindowInvokableTest {
                                }, distributedTriggers, evictions, triggers, 
null);
 
                ArrayList<Integer> result = new ArrayList<Integer>();
-               for (Integer t : MockInvokable.createAndExecute(invokable, 
inputs)) {
+               for (Integer t : MockContext.createAndExecute(invokable, 
inputs)) {
                        result.add(t);
                }
 
@@ -556,7 +556,7 @@ public class GroupedWindowInvokableTest {
                                }, distributedTriggers, evictions, triggers, 
null);
 
                ArrayList<Integer> result = new ArrayList<Integer>();
-               for (Integer t : MockInvokable.createAndExecute(invokable, 
inputs)) {
+               for (Integer t : MockContext.createAndExecute(invokable, 
inputs)) {
                        result.add(t);
                }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
index 7124ff8..5390ec9 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.junit.Test;
 
 public class MapTest {
@@ -42,7 +42,7 @@ public class MapTest {
                MapInvokable<Integer, String> invokable = new 
MapInvokable<Integer, String>(new Map());
                
                List<String> expectedList = Arrays.asList("+2", "+3", "+4");
-               List<String> actualList = 
MockInvokable.createAndExecute(invokable, Arrays.asList(1, 2, 3));
+               List<String> actualList = 
MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3));
                
                assertEquals(expectedList, actualList);
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
index 288d4ee..11c44cd 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.datastream.StreamProjection;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.junit.Test;
 
 public class ProjectTest implements Serializable {
@@ -62,6 +62,6 @@ public class ProjectTest implements Serializable {
                expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
                expected.add(new Tuple3<Integer, Integer, String>(7, 7, "a"));
 
-               assertEquals(expected, 
MockInvokable.createAndExecute(invokable, input));
+               assertEquals(expected, MockContext.createAndExecute(invokable, 
input));
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
index 68b8f8e..ae866e6 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.junit.Test;
 
 public class StreamReduceTest {
@@ -45,7 +45,7 @@ public class StreamReduceTest {
                                new MyReducer());
 
                List<Integer> expected = Arrays.asList(1,2,4,7,10);
-               List<Integer> actual = 
MockInvokable.createAndExecute(invokable1,
+               List<Integer> actual = MockContext.createAndExecute(invokable1,
                                Arrays.asList(1, 1, 2, 3, 3));
 
                assertEquals(expected, actual);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
index 612da84..421a999 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
@@ -31,7 +31,7 @@ import 
org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.junit.Test;
 
 public class WindowInvokableTest {
@@ -98,7 +98,7 @@ public class WindowInvokableTest {
                                myReduceFunction, triggers, evictions);
 
                ArrayList<Integer> result = new ArrayList<Integer>();
-               for (Integer t : MockInvokable.createAndExecute(invokable, 
inputs)) {
+               for (Integer t : MockContext.createAndExecute(invokable, 
inputs)) {
                        result.add(t);
                }
 
@@ -148,7 +148,7 @@ public class WindowInvokableTest {
                expected.add(24);
                expected.add(19);
                List<Integer> result = new ArrayList<Integer>();
-               for (Integer t : MockInvokable.createAndExecute(invokable, 
inputs)) {
+               for (Integer t : MockContext.createAndExecute(invokable, 
inputs)) {
                        result.add(t);
                }
                assertEquals(expected, result);
@@ -199,7 +199,7 @@ public class WindowInvokableTest {
                expected2.add(-4);
 
                result = new ArrayList<Integer>();
-               for (Integer t : MockInvokable.createAndExecute(invokable2, 
inputs2)) {
+               for (Integer t : MockContext.createAndExecute(invokable2, 
inputs2)) {
                        result.add(t);
                }
 
@@ -253,7 +253,7 @@ public class WindowInvokableTest {
                                myReduceFunction, triggers, evictions);
 
                ArrayList<Integer> result = new ArrayList<Integer>();
-               for (Integer t : MockInvokable.createAndExecute(invokable, 
inputs)) {
+               for (Integer t : MockContext.createAndExecute(invokable, 
inputs)) {
                        result.add(t);
                }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
new file mode 100644
index 0000000..ea94f98
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
+import org.apache.flink.streaming.io.CoReaderIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> {
+       // private Collection<IN1> input1;
+       // private Collection<IN2> input2;
+       private Iterator<IN1> inputIterator1;
+       private Iterator<IN2> inputIterator2;
+       private List<OUT> outputs;
+
+       private Collector<OUT> collector;
+       private StreamRecordSerializer<IN1> inDeserializer1;
+       private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> 
mockIterator;
+       private StreamRecordSerializer<IN2> inDeserializer2;
+
+       public MockCoContext(Collection<IN1> input1, Collection<IN2> input2) {
+
+               if (input1.isEmpty() || input2.isEmpty()) {
+                       throw new RuntimeException("Inputs must not be empty");
+               }
+
+               this.inputIterator1 = input1.iterator();
+               this.inputIterator2 = input2.iterator();
+
+               TypeInformation<IN1> inTypeInfo1 = 
TypeExtractor.getForObject(input1.iterator().next());
+               inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1);
+               TypeInformation<IN2> inTypeInfo2 = 
TypeExtractor.getForObject(input2.iterator().next());
+               inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2);
+
+               mockIterator = new MockCoReaderIterator(inDeserializer1, 
inDeserializer2);
+
+               outputs = new ArrayList<OUT>();
+               collector = new MockCollector<OUT>(outputs);
+       }
+
+       private int currentInput = 1;
+       private StreamRecord<IN1> reuse1;
+       private StreamRecord<IN2> reuse2;
+
+       private class MockCoReaderIterator extends
+                       CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> {
+
+               public MockCoReaderIterator(TypeSerializer<StreamRecord<IN1>> 
serializer1,
+                               TypeSerializer<StreamRecord<IN2>> serializer2) {
+                       super(null, serializer1, serializer2);
+                       reuse1 = inDeserializer1.createInstance();
+                       reuse2 = inDeserializer2.createInstance();
+               }
+
+               @Override
+               public int next(StreamRecord<IN1> target1, StreamRecord<IN2> 
target2) throws IOException {
+                       this.delegate1.setInstance(target1);
+                       this.delegate2.setInstance(target2);
+
+                       int inputNumber = nextRecord();
+                       target1.setObject(reuse1.getObject());
+                       target2.setObject(reuse2.getObject());
+
+                       return inputNumber;
+               }
+       }
+
+       private Integer nextRecord() {
+               if (inputIterator1.hasNext() && inputIterator2.hasNext()) {
+                       switch (currentInput) {
+                       case 1:
+                               return next1();
+                       case 2:
+                               return next2();
+                       default:
+                               return 0;
+                       }
+               }
+
+               if (inputIterator1.hasNext()) {
+                       return next1();
+               }
+
+               if (inputIterator2.hasNext()) {
+                       return next2();
+               }
+
+               return 0;
+       }
+
+       private int next1() {
+               reuse1 = inDeserializer1.createInstance();
+               reuse1.setObject(inputIterator1.next());
+               currentInput = 2;
+               return 1;
+       }
+
+       private int next2() {
+               reuse2 = inDeserializer2.createInstance();
+               reuse2.setObject(inputIterator2.next());
+               currentInput = 1;
+               return 2;
+       }
+
+       public List<OUT> getOutputs() {
+               return outputs;
+       }
+
+       public Collector<OUT> getCollector() {
+               return collector;
+       }
+
+       public StreamRecordSerializer<IN1> getInDeserializer1() {
+               return inDeserializer1;
+       }
+
+       public StreamRecordSerializer<IN2> getInDeserializer2() {
+               return inDeserializer2;
+       }
+
+       public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> 
getIterator() {
+               return mockIterator;
+       }
+
+       public static <IN1, IN2, OUT> List<OUT> 
createAndExecute(CoInvokable<IN1, IN2, OUT> invokable,
+                       List<IN1> input1, List<IN2> input2) {
+               MockCoContext<IN1, IN2, OUT> mockContext = new 
MockCoContext<IN1, IN2, OUT>(input1, input2);
+               invokable.setup(mockContext);
+
+               try {
+                       invokable.open(null);
+                       invokable.invoke();
+                       invokable.close();
+               } catch (Exception e) {
+                       throw new RuntimeException("Cannot invoke invokable.", 
e);
+               }
+
+               return mockContext.getOutputs();
+       }
+
+       @Override
+       public StreamConfig getConfig() {
+               return null;
+       }
+
+       @Override
+       public ClassLoader getUserCodeClassLoader() {
+               return null;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public <X> MutableObjectIterator<X> getInput(int index) {
+               switch (index) {
+               case 0:
+                       return (MutableObjectIterator<X>) inputIterator1;
+               case 1:
+                       return (MutableObjectIterator<X>) inputIterator2;
+               default:
+                       throw new IllegalArgumentException("CoStreamVertex has 
only 2 inputs");
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
+               switch (index) {
+               case 0:
+                       return (StreamRecordSerializer<X>) inDeserializer1;
+               case 1:
+                       return (StreamRecordSerializer<X>) inDeserializer2;
+               default:
+                       throw new IllegalArgumentException("CoStreamVertex has 
only 2 inputs");
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public <X, Y> CoReaderIterator<X, Y> getCoReader() {
+               return (CoReaderIterator<X, Y>) mockIterator;
+       }
+
+       @Override
+       public Collector<OUT> getOutputCollector() {
+               return collector;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
deleted file mode 100644
index 39d3ab4..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.util.Collector;
-
-public class MockCoInvokable<IN1, IN2, OUT> {
-       // private Collection<IN1> input1;
-       // private Collection<IN2> input2;
-       private Iterator<IN1> inputIterator1;
-       private Iterator<IN2> inputIterator2;
-       private List<OUT> outputs;
-
-       private Collector<OUT> collector;
-       private StreamRecordSerializer<IN1> inDeserializer1;
-       private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> 
mockIterator;
-       private StreamRecordSerializer<IN2> inDeserializer2;
-
-       public MockCoInvokable(Collection<IN1> input1, Collection<IN2> input2) {
-
-               if (input1.isEmpty() || input2.isEmpty()) {
-                       throw new RuntimeException("Inputs must not be empty");
-               }
-
-               this.inputIterator1 = input1.iterator();
-               this.inputIterator2 = input2.iterator();
-
-               TypeInformation<IN1> inTypeInfo1 = 
TypeExtractor.getForObject(input1.iterator().next());
-               inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1);
-               TypeInformation<IN2> inTypeInfo2 = 
TypeExtractor.getForObject(input2.iterator().next());
-               inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2);
-
-               mockIterator = new MockCoReaderIterator(inDeserializer1, 
inDeserializer2);
-
-               outputs = new ArrayList<OUT>();
-               collector = new MockCollector<OUT>(outputs);
-       }
-
-       private int currentInput = 1;
-       private StreamRecord<IN1> reuse1;
-       private StreamRecord<IN2> reuse2;
-       
-       private class MockCoReaderIterator extends
-                       CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> {
-
-               public MockCoReaderIterator(
-                               TypeSerializer<StreamRecord<IN1>> serializer1,
-                               TypeSerializer<StreamRecord<IN2>> serializer2) {
-                       super(null, serializer1, serializer2);
-                       reuse1 = inDeserializer1.createInstance();
-                       reuse2 = inDeserializer2.createInstance();
-               }
-
-               @Override
-               public int next(StreamRecord<IN1> target1, StreamRecord<IN2> 
target2) throws IOException {
-                       this.delegate1.setInstance(target1);
-                       this.delegate2.setInstance(target2);
-                       
-                       int inputNumber = nextRecord();
-                       target1.setObject(reuse1.getObject());
-                       target2.setObject(reuse2.getObject());
-                       
-                       return inputNumber;
-               }
-       }
-
-       private Integer nextRecord() {
-               if (inputIterator1.hasNext() && inputIterator2.hasNext()) {
-                       switch (currentInput) {
-                       case 1:
-                               return next1();
-                       case 2:
-                               return next2();
-                       default:
-                               return 0;
-                       }
-               }
-
-               if (inputIterator1.hasNext()) {
-                       return next1();
-               }
-
-               if (inputIterator2.hasNext()) {
-                       return next2();
-               }
-
-               return 0;
-       }
-
-       private int next1() {
-               reuse1 = inDeserializer1.createInstance();
-               reuse1.setObject(inputIterator1.next());
-               currentInput = 2;
-               return 1;
-       }
-
-       private int next2() {
-               reuse2 = inDeserializer2.createInstance();
-               reuse2.setObject(inputIterator2.next());
-               currentInput = 1;
-               return 2;
-       }
-
-       public List<OUT> getOutputs() {
-               return outputs;
-       }
-
-       public Collector<OUT> getCollector() {
-               return collector;
-       }
-
-       public StreamRecordSerializer<IN1> getInDeserializer1() {
-               return inDeserializer1;
-       }
-
-       public StreamRecordSerializer<IN2> getInDeserializer2() {
-               return inDeserializer2;
-       }
-
-       public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> 
getIterator() {
-               return mockIterator;
-       }
-
-       public static <IN1, IN2, OUT> List<OUT> 
createAndExecute(CoInvokable<IN1, IN2, OUT> invokable,
-                       List<IN1> input1, List<IN2> input2) {
-               MockCoInvokable<IN1, IN2, OUT> mock = new MockCoInvokable<IN1, 
IN2, OUT>(input1, input2);
-               invokable.initialize(mock.getCollector(), mock.getIterator(), 
mock.getInDeserializer1(),
-                               mock.getInDeserializer2(), false);
-
-               try {
-                       invokable.open(null);
-                       invokable.invoke();
-                       invokable.close();
-               } catch (Exception e) {
-                       throw new RuntimeException("Cannot invoke invokable.", 
e);
-               }
-
-               return mock.getOutputs();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
new file mode 100644
index 0000000..87bedb2
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
+import org.apache.flink.streaming.io.CoReaderIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class MockContext<IN, OUT> implements StreamTaskContext<OUT> {
+       private Collection<IN> inputs;
+       private List<OUT> outputs;
+
+       private Collector<OUT> collector;
+       private StreamRecordSerializer<IN> inDeserializer;
+       private MutableObjectIterator<StreamRecord<IN>> iterator;
+
+       public MockContext(Collection<IN> inputs) {
+               this.inputs = inputs;
+               if (inputs.isEmpty()) {
+                       throw new RuntimeException("Inputs must not be empty");
+               }
+
+               TypeInformation<IN> inTypeInfo = 
TypeExtractor.getForObject(inputs.iterator().next());
+               inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo);
+
+               iterator = new MockInputIterator();
+               outputs = new ArrayList<OUT>();
+               collector = new MockCollector<OUT>(outputs);
+       }
+
+       private class MockInputIterator implements 
MutableObjectIterator<StreamRecord<IN>> {
+               Iterator<IN> listIterator;
+
+               public MockInputIterator() {
+                       listIterator = inputs.iterator();
+               }
+
+               @Override
+               public StreamRecord<IN> next(StreamRecord<IN> reuse) throws 
IOException {
+                       if (listIterator.hasNext()) {
+                               reuse.setObject(listIterator.next());
+                       } else {
+                               reuse = null;
+                       }
+                       return reuse;
+               }
+       }
+
+       public List<OUT> getOutputs() {
+               return outputs;
+       }
+
+       public Collector<OUT> getCollector() {
+               return collector;
+       }
+
+       public StreamRecordSerializer<IN> getInDeserializer() {
+               return inDeserializer;
+       }
+
+       public MutableObjectIterator<StreamRecord<IN>> getIterator() {
+               return iterator;
+       }
+
+       public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, 
OUT> invokable,
+                       List<IN> inputs) {
+               MockContext<IN, OUT> mockContext = new MockContext<IN, 
OUT>(inputs);
+               invokable.setup(mockContext);
+               try {
+                       invokable.open(null);
+                       invokable.invoke();
+                       invokable.close();
+               } catch (Exception e) {
+                       throw new RuntimeException("Cannot invoke invokable.", 
e);
+               }
+
+               return mockContext.getOutputs();
+       }
+
+       @Override
+       public StreamConfig getConfig() {
+               return null;
+       }
+
+       @Override
+       public ClassLoader getUserCodeClassLoader() {
+               return null;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public <X> MutableObjectIterator<X> getInput(int index) {
+               if (index == 0) {
+                       return (MutableObjectIterator<X>) iterator;
+               } else {
+                       throw new IllegalArgumentException("There is only 1 
input");
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
+               if (index == 0) {
+                       return (StreamRecordSerializer<X>) inDeserializer;
+               } else {
+                       throw new IllegalArgumentException("There is only 1 
input");
+               }
+       }
+
+       @Override
+       public Collector<OUT> getOutputCollector() {
+               return collector;
+       }
+
+       @Override
+       public <X, Y> CoReaderIterator<X, Y> getCoReader() {
+               throw new IllegalArgumentException("CoReader not available");
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
deleted file mode 100644
index c06f53a..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class MockInvokable<IN, OUT> {
-       private Collection<IN> inputs;
-       private List<OUT> outputs;
-
-       private Collector<OUT> collector;
-       private StreamRecordSerializer<IN> inDeserializer;
-       private MutableObjectIterator<StreamRecord<IN>> iterator;
-
-       public MockInvokable(Collection<IN> inputs) {
-               this.inputs = inputs;
-               if (inputs.isEmpty()) {
-                       throw new RuntimeException("Inputs must not be empty");
-               }
-
-               TypeInformation<IN> inTypeInfo = 
TypeExtractor.getForObject(inputs.iterator().next());
-               inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo);
-               
-               iterator = new MockInputIterator();
-               outputs = new ArrayList<OUT>();
-               collector = new MockCollector<OUT>(outputs);
-       }
-
-
-       private class MockInputIterator implements 
MutableObjectIterator<StreamRecord<IN>> {
-               Iterator<IN> listIterator;
-               
-               public MockInputIterator() {
-                       listIterator = inputs.iterator();
-               }
-
-               @Override
-               public StreamRecord<IN> next(StreamRecord<IN> reuse) throws 
IOException {
-                       if (listIterator.hasNext()) {
-                               reuse.setObject(listIterator.next());
-                       } else {
-                               reuse = null;
-                       }
-                       return reuse;
-               }
-       }
-
-       public List<OUT> getOutputs() {
-               return outputs;
-       }
-
-       public Collector<OUT> getCollector() {
-               return collector;
-       }
-
-       public StreamRecordSerializer<IN> getInDeserializer() {
-               return inDeserializer;
-       }
-
-       public MutableObjectIterator<StreamRecord<IN>> getIterator() {
-               return iterator;
-       }
-
-       public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, 
OUT> invokable, List<IN> inputs) {
-               MockInvokable<IN, OUT> mock = new MockInvokable<IN, 
OUT>(inputs);
-               invokable.initialize(mock.getCollector(), mock.getIterator(), 
mock.getInDeserializer(), false);
-               try {
-                       invokable.open(null);
-                       invokable.invoke();
-                       invokable.close();
-               } catch (Exception e) {
-                       throw new RuntimeException("Cannot invoke invokable.", 
e);
-               }
-               
-               return mock.getOutputs();
-       }
-       
-}

Reply via email to