[streaming] StreamInvokable rework for simpler logic and easier use

Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/88f38e49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/88f38e49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/88f38e49

Branch: refs/heads/release-0.8
Commit: 88f38e49202354926cb4ec36390cbe34bad247a3
Parents: 15fb1da
Author: Gyula Fora <[email protected]>
Authored: Wed Dec 17 23:34:26 2014 +0100
Committer: mbalassi <[email protected]>
Committed: Thu Dec 18 18:52:48 2014 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         |  10 -
 .../flink/streaming/api/JobGraphBuilder.java    |   8 -
 .../flink/streaming/api/StreamConfig.java       |  13 +-
 .../streaming/api/datastream/DataStream.java    |   9 +-
 .../datastream/SingleOutputStreamOperator.java  |  18 --
 .../streaming/api/invokable/SinkInvokable.java  |  14 +-
 .../api/invokable/SourceInvokable.java          |  22 +-
 .../api/invokable/StreamInvokable.java          |  87 +++-----
 .../invokable/operator/CounterInvokable.java    |  23 +-
 .../api/invokable/operator/FilterInvokable.java |  24 +-
 .../invokable/operator/FlatMapInvokable.java    |  14 +-
 .../operator/GroupedReduceInvokable.java        |   4 +-
 .../operator/GroupedWindowInvokable.java        |  76 +++----
 .../api/invokable/operator/MapInvokable.java    |  14 +-
 .../invokable/operator/ProjectInvokable.java    |  11 +-
 .../operator/StreamReduceInvokable.java         |  14 +-
 .../api/invokable/operator/WindowInvokable.java |  29 +--
 .../operator/co/CoBatchReduceInvokable.java     |   9 +-
 .../api/invokable/operator/co/CoInvokable.java  |  45 +---
 .../operator/co/CoWindowInvokable.java          |   5 -
 .../api/streamvertex/CoStreamVertex.java        |  37 +++-
 .../api/streamvertex/StreamTaskContext.java     |  40 ++++
 .../api/streamvertex/StreamVertex.java          |  48 +++-
 .../streaming/api/AggregationFunctionTest.java  |  36 +--
 .../api/collector/DirectedOutputTest.java       |   2 +-
 .../invokable/operator/CoBatchReduceTest.java   |   6 +-
 .../api/invokable/operator/CoFlatMapTest.java   |   4 +-
 .../operator/CoGroupedBatchReduceTest.java      |   6 +-
 .../invokable/operator/CoGroupedReduceTest.java |   6 +-
 .../operator/CoGroupedWindowReduceTest.java     |   6 +-
 .../api/invokable/operator/CoMapTest.java       |   4 +-
 .../invokable/operator/CoStreamReduceTest.java  |   4 +-
 .../invokable/operator/CoWindowReduceTest.java  |   6 +-
 .../api/invokable/operator/CoWindowTest.java    |   6 +-
 .../operator/CounterInvokableTest.java          |   4 +-
 .../api/invokable/operator/FilterTest.java      |   4 +-
 .../api/invokable/operator/FlatMapTest.java     |   4 +-
 .../operator/GroupedReduceInvokableTest.java    |   4 +-
 .../operator/GroupedWindowInvokableTest.java    |  16 +-
 .../api/invokable/operator/MapTest.java         |   4 +-
 .../api/invokable/operator/ProjectTest.java     |   4 +-
 .../invokable/operator/StreamReduceTest.java    |   4 +-
 .../invokable/operator/WindowInvokableTest.java |  10 +-
 .../flink/streaming/util/MockCoContext.java     | 217 +++++++++++++++++++
 .../flink/streaming/util/MockCoInvokable.java   | 169 ---------------
 .../flink/streaming/util/MockContext.java       | 148 +++++++++++++
 .../flink/streaming/util/MockInvokable.java     | 105 ---------
 47 files changed, 665 insertions(+), 688 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 6e7f932..c51afbc 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -609,16 +609,6 @@ env.genereateSequence(1,10).map(new 
MyMapper()).setBufferTimeout(timeoutMillis);
 To maximise the throughput the user can call `.setBufferTimeout(-1)` which 
will remove the timeout and buffers will only be flushed when they are full.
 To minimise latency, set the timeout to a value close to 0 (fro example 5 or 
10 ms). Theoretically a buffer timeout of 0 will cause all outputs to be 
flushed when produced, but this setting should be avoided because it can cause 
severe performance degradation.
 
-### Mutability
-
-This is currently a beta feature and it is only supported for a subset of the 
available operators.
-
-Most operators allow setting mutability for reading input data. If the 
operator is set mutable then the variable used to store input data for 
operators will be reused in a mutable fashion to avoid excessive object 
creation. By default, all operators are set to immutable.
-Usage:
-
-~~~java
-operator.setMutability(isMutable)
-~~~
 
 [Back to top](#top)
     

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index d63042a..d66e388 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -64,7 +64,6 @@ public class JobGraphBuilder {
        private Map<String, List<Integer>> outEdgeType;
        private Map<String, List<List<String>>> outEdgeNames;
        private Map<String, List<Boolean>> outEdgeSelectAll;
-       private Map<String, Boolean> mutability;
        private Map<String, List<String>> inEdgeList;
        private Map<String, List<StreamPartitioner<?>>> connectionTypes;
        private Map<String, String> operatorNames;
@@ -97,7 +96,6 @@ public class JobGraphBuilder {
                outEdgeType = new HashMap<String, List<Integer>>();
                outEdgeNames = new HashMap<String, List<List<String>>>();
                outEdgeSelectAll = new HashMap<String, List<Boolean>>();
-               mutability = new HashMap<String, Boolean>();
                inEdgeList = new HashMap<String, List<String>>();
                connectionTypes = new HashMap<String, 
List<StreamPartitioner<?>>>();
                operatorNames = new HashMap<String, String>();
@@ -302,7 +300,6 @@ public class JobGraphBuilder {
 
                vertexClasses.put(vertexName, vertexClass);
                setParallelism(vertexName, parallelism);
-               mutability.put(vertexName, false);
                invokableObjects.put(vertexName, invokableObject);
                operatorNames.put(vertexName, operatorName);
                serializedFunctions.put(vertexName, serializedFunction);
@@ -355,7 +352,6 @@ public class JobGraphBuilder {
 
                StreamConfig config = new 
StreamConfig(vertex.getConfiguration());
 
-               config.setMutability(mutability.get(vertexName));
                config.setBufferTimeout(bufferTimeout.get(vertexName));
 
                config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName));
@@ -447,10 +443,6 @@ public class JobGraphBuilder {
                inputFormatList.put(vertexName, inputFormat);
        }
 
-       public void setMutability(String vertexName, boolean isMutable) {
-               mutability.put(vertexName, isMutable);
-       }
-
        public void setBufferTimeout(String vertexName, long bufferTimeout) {
                this.bufferTimeout.put(vertexName, bufferTimeout);
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 1d863a7..9800b63 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -57,14 +57,11 @@ public class StreamConfig {
        private static final String TYPE_SERIALIZER_IN_2 = 
"typeSerializer_in_2";
        private static final String TYPE_SERIALIZER_OUT_1 = 
"typeSerializer_out_1";
        private static final String TYPE_SERIALIZER_OUT_2 = 
"typeSerializer_out_2";
-       private static final String MUTABILITY = "isMutable";
        private static final String ITERATON_WAIT = "iterationWait";
 
        // DEFAULT VALUES
 
-       private static final boolean DEFAULT_IS_MUTABLE = false;
-
-       private static final long DEFAULT_TIMEOUT = 0;
+       private static final long DEFAULT_TIMEOUT = 100;
 
        // CONFIG METHODS
 
@@ -138,14 +135,6 @@ public class StreamConfig {
                config.setBytes(key, SerializationUtils.serialize(typeWrapper));
        }
 
-       public void setMutability(boolean isMutable) {
-               config.setBoolean(MUTABILITY, isMutable);
-       }
-
-       public boolean getMutability() {
-               return config.getBoolean(MUTABILITY, DEFAULT_IS_MUTABLE);
-       }
-
        public void setBufferTimeout(long timeout) {
                config.setLong(BUFFER_TIMEOUT, timeout);
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 474d57b..6e8da0a 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -366,7 +366,7 @@ public class DataStream<OUT> {
         * the data stream that will be fed back and used as the input for the
         * iteration head. A common usage pattern for streaming iterations is 
to use
         * output splitting to send a part of the closing data stream to the 
head.
-        * Refer to {@link SingleOutputStreamOperator#split(OutputSelector)} for
+        * Refer to {@link SingleOutputStreamOperator#split(outputSelector)} for
         * more information.
         * <p>
         * The iteration edge will be partitioned the same way as the first 
input of
@@ -940,7 +940,6 @@ public class DataStream<OUT> {
                        WriteFormatAsText<OUT> format, long millis, OUT 
endTuple) {
                DataStreamSink<OUT> returnStream = addSink(inputStream, new 
WriteSinkFunctionByMillis<OUT>(
                                path, format, millis, endTuple), 
inputStream.typeInfo);
-               jobGraphBuilder.setMutability(returnStream.getId(), false);
                return returnStream;
        }
 
@@ -968,7 +967,6 @@ public class DataStream<OUT> {
                DataStreamSink<OUT> returnStream = addSink(inputStream,
                                new WriteSinkFunctionByBatches<OUT>(path, 
format, batchSize, endTuple),
                                inputStream.typeInfo);
-               jobGraphBuilder.setMutability(returnStream.getId(), false);
                return returnStream;
        }
 
@@ -1063,9 +1061,6 @@ public class DataStream<OUT> {
         * @return The closed DataStream
         */
        public DataStreamSink<OUT> writeAsCsv(String path, int batchSize, OUT 
endTuple) {
-               if (this instanceof SingleOutputStreamOperator) {
-                       ((SingleOutputStreamOperator<?, ?>) 
this).setMutability(false);
-               }
                return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), 
batchSize, endTuple);
        }
 
@@ -1091,7 +1086,6 @@ public class DataStream<OUT> {
                        WriteFormatAsCsv<OUT> format, long millis, OUT 
endTuple) {
                DataStreamSink<OUT> returnStream = addSink(inputStream, new 
WriteSinkFunctionByMillis<OUT>(
                                path, format, millis, endTuple), 
inputStream.typeInfo);
-               jobGraphBuilder.setMutability(returnStream.getId(), false);
                return returnStream;
        }
 
@@ -1119,7 +1113,6 @@ public class DataStream<OUT> {
                DataStreamSink<OUT> returnStream = addSink(inputStream,
                                new WriteSinkFunctionByBatches<OUT>(path, 
format, batchSize, endTuple),
                                inputStream.typeInfo);
-               jobGraphBuilder.setMutability(returnStream.getId(), false);
                return returnStream;
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 3e1c940..016322b 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -71,24 +71,6 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
                return this;
        }
 
-       /**
-        * This is a beta feature, use with care </br><br/>
-        * Sets the mutability of the operator. If the operator is set to 
mutable,
-        * the tuples received in the user defined functions, will be reused 
after
-        * the function call. Setting an operator to mutable reduces garbage
-        * collection overhead and thus increases scalability. Please note that 
if a
-        * {@link DataStream#batchReduce} or {@link DataStream#windowReduce} is 
used
-        * as mutable, the user can only iterate through the iterator once in 
every
-        * invoke.
-        * 
-        * @param isMutable
-        *            The mutability of the operator.
-        * @return The operator with mutability set.
-        */
-       public SingleOutputStreamOperator<OUT, O> setMutability(boolean 
isMutable) {
-               jobGraphBuilder.setMutability(id, isMutable);
-               return this;
-       }
 
        /**
         * Sets the maximum time frequency (ms) for the flushing of the output

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index ec33224..74591a8 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -30,23 +30,15 @@ public class SinkInvokable<IN> extends StreamInvokable<IN, 
IN> {
        }
 
        @Override
-       protected void immutableInvoke() throws Exception {
-               while ((reuse = recordIterator.next(reuse)) != null) {
-                       callUserFunctionAndLogException();
-                       resetReuse();
-               }
-       }
-
-       @Override
-       protected void mutableInvoke() throws Exception {
-               while ((reuse = recordIterator.next(reuse)) != null) {
+       public void invoke() throws Exception {
+               while (readNext() != null) {
                        callUserFunctionAndLogException();
                }
        }
 
        @Override
        protected void callUserFunction() throws Exception {
-               sinkFunction.invoke((IN) reuse.getObject());            
+               sinkFunction.invoke((IN) nextRecord.getObject());
        }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index 0cfe028..f1cf2c5 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -21,38 +21,24 @@ import java.io.Serializable;
 
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 
-public class SourceInvokable<OUT> extends StreamInvokable<OUT,OUT> implements 
Serializable {
+public class SourceInvokable<OUT> extends StreamInvokable<OUT, OUT> implements 
Serializable {
 
        private static final long serialVersionUID = 1L;
 
        private SourceFunction<OUT> sourceFunction;
 
-
        public SourceInvokable(SourceFunction<OUT> sourceFunction) {
                super(sourceFunction);
                this.sourceFunction = sourceFunction;
        }
 
        @Override
-       public void invoke() throws Exception {
-               sourceFunction.invoke(collector);
-       }
-
-       @Override
-       protected void immutableInvoke() throws Exception {             
-       }
-
-       @Override
-       protected void mutableInvoke() throws Exception {               
+       public void invoke() {
+               callUserFunctionAndLogException();
        }
 
        @Override
        protected void callUserFunction() throws Exception {
+               sourceFunction.invoke(collector);
        }
-       
-       @Override
-       public SourceFunction<OUT> getSourceFunction(){
-               return sourceFunction;
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index e587b93..d19d7ad 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -17,16 +17,16 @@
 
 package org.apache.flink.streaming.api.invokable;
 
+import java.io.IOException;
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.StringUtils;
@@ -45,9 +45,11 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
        private static final long serialVersionUID = 1L;
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamInvokable.class);
 
+       protected StreamTaskContext<OUT> taskContext;
+
        protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
        protected StreamRecordSerializer<IN> inSerializer;
-       protected StreamRecord<IN> reuse;
+       protected StreamRecord<IN> nextRecord;
        protected boolean isMutable;
 
        protected Collector<OUT> collector;
@@ -61,48 +63,43 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
        /**
         * Initializes the {@link StreamInvokable} for input and output handling
         * 
-        * @param collector
-        *            Collector object for collecting the outputs for the 
operator
-        * @param recordIterator
-        *            Iterator for reading in the input records
-        * @param serializer
-        *            Serializer used to deserialize inputs
-        * @param isMutable
-        *            Mutability setting for the operator
+        * @param taskContext
+        *            StreamTaskContext representing the vertex
         */
-       public void initialize(Collector<OUT> collector,
-                       MutableObjectIterator<StreamRecord<IN>> recordIterator,
-                       StreamRecordSerializer<IN> serializer, boolean 
isMutable) {
-               this.collector = collector;
-               this.recordIterator = recordIterator;
-               this.inSerializer = serializer;
+       public void setup(StreamTaskContext<OUT> taskContext) {
+               this.collector = taskContext.getOutputCollector();
+               this.recordIterator = taskContext.getInput(0);
+               this.inSerializer = taskContext.getInputSerializer(0);
                if (this.inSerializer != null) {
-                       this.reuse = serializer.createInstance();
+                       this.nextRecord = inSerializer.createInstance();
                }
-               this.isMutable = isMutable;
-       }
-
-       /**
-        * Re-initializes the object in which the next input record will be 
read in
-        */
-       protected void resetReuse() {
-               this.reuse = inSerializer.createInstance();
+               this.taskContext = taskContext;
        }
 
        /**
-        * Method that will be called if the mutability setting is set to 
immutable
+        * Method that will be called when the operator starts, should encode 
the
+        * processing logic
         */
-       protected abstract void immutableInvoke() throws Exception;
+       public abstract void invoke() throws Exception;
 
-       /**
-        * Method that will be called if the mutability setting is set to 
mutable
+       /*
+        * Reads the next record from the reader iterator and stores it in the
+        * nextRecord variable
         */
-       protected abstract void mutableInvoke() throws Exception;
+       protected StreamRecord<IN> readNext() {
+               this.nextRecord = inSerializer.createInstance();
+               try {
+                       return nextRecord = recordIterator.next(nextRecord);
+               } catch (IOException e) {
+                       throw new RuntimeException("Could not read next 
record.");
+               }
+       }
 
        /**
         * The call of the user implemented function should be implemented here
         */
-       protected abstract void callUserFunction() throws Exception;
+       protected void callUserFunction() throws Exception {
+       }
 
        /**
         * Method for logging exceptions thrown during the user function call
@@ -119,20 +116,6 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
        }
 
        /**
-        * Method that will be called when the stream starts. The user should 
encode
-        * the processing functionality in {@link #mutableInvoke()} and
-        * {@link #immutableInvoke()}
-        * 
-        */
-       public void invoke() throws Exception {
-               if (this.isMutable) {
-                       mutableInvoke();
-               } else {
-                       immutableInvoke();
-               }
-       }
-
-       /**
         * Open method to be used if the user defined function extends the
         * RichFunction class
         * 
@@ -141,9 +124,7 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
         */
        public void open(Configuration parameters) throws Exception {
                isRunning = true;
-               if (userFunction instanceof RichFunction) {
-                       ((RichFunction) userFunction).open(parameters);
-               }
+               FunctionUtils.openFunction(userFunction, parameters);
        }
 
        /**
@@ -154,16 +135,10 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
        public void close() throws Exception {
                isRunning = false;
                collector.close();
-               if (userFunction instanceof RichFunction) {
-                       ((RichFunction) userFunction).close();
-               }
+               FunctionUtils.closeFunction(userFunction);
        }
 
        public void setRuntimeContext(RuntimeContext t) {
                FunctionUtils.setFunctionRuntimeContext(userFunction, t);
        }
-
-       public SourceFunction<OUT> getSourceFunction() {
-               return null;
-       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
index 7924595..0267253 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
@@ -21,30 +21,17 @@ import 
org.apache.flink.streaming.api.invokable.StreamInvokable;
 
 public class CounterInvokable<IN> extends StreamInvokable<IN, Long> {
        private static final long serialVersionUID = 1L;
-       
+
        Long count = 0L;
-       
+
        public CounterInvokable() {
                super(null);
        }
-       
-       @Override
-       protected void immutableInvoke() throws Exception {
-               while ((reuse = recordIterator.next(reuse)) != null) {
-                       callUserFunctionAndLogException();
-                       resetReuse();
-               }
-       }
 
        @Override
-       protected void mutableInvoke() throws Exception {
-               while ((reuse = recordIterator.next(reuse)) != null) {
-                       callUserFunctionAndLogException();
+       public void invoke() throws Exception {
+               while (readNext() != null) {
+                       collector.collect(++count);
                }
        }
-
-       @Override
-       protected void callUserFunction() throws Exception {
-               collector.collect(++count);
-       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index a54b6ad..796196d 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -25,37 +25,25 @@ public class FilterInvokable<IN> extends 
StreamInvokable<IN, IN> {
        private static final long serialVersionUID = 1L;
 
        FilterFunction<IN> filterFunction;
+       private boolean collect;
 
        public FilterInvokable(FilterFunction<IN> filterFunction) {
                super(filterFunction);
                this.filterFunction = filterFunction;
        }
 
-       private boolean canCollect;
-       
        @Override
-       protected void immutableInvoke() throws Exception {
-               while ((reuse = recordIterator.next(reuse)) != null) {
+       public void invoke() throws Exception {
+               while (readNext() != null) {
                        callUserFunctionAndLogException();
-                       if (canCollect) {
-                               collector.collect(reuse.getObject());
-                       }
-                       resetReuse();
-               }
-       }
-
-       @Override
-       protected void mutableInvoke() throws Exception {
-               while ((reuse = recordIterator.next(reuse)) != null) {
-                       callUserFunctionAndLogException();
-                       if (canCollect) {
-                               collector.collect(reuse.getObject());
+                       if (collect) {
+                               collector.collect(nextRecord.getObject());
                        }
                }
        }
 
        @Override
        protected void callUserFunction() throws Exception {
-               canCollect = filterFunction.filter(reuse.getObject());
+               collect = filterFunction.filter(nextRecord.getObject());
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 3452a82..8ff78eb 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -31,23 +31,15 @@ public class FlatMapInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
        }
 
        @Override
-       protected void immutableInvoke() throws Exception {
-               while ((reuse = recordIterator.next(reuse)) != null) {
-                       callUserFunctionAndLogException();
-                       resetReuse();
-               }
-       }
-
-       @Override
-       protected void mutableInvoke() throws Exception {
-               while ((reuse = recordIterator.next(reuse)) != null) {
+       public void invoke() throws Exception {
+               while (readNext() != null) {
                        callUserFunctionAndLogException();
                }
        }
 
        @Override
        protected void callUserFunction() throws Exception {
-               flatMapper.flatMap(reuse.getObject(), collector);
+               flatMapper.flatMap(nextRecord.getObject(), collector);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
index d64fb6f..fdcf520 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
@@ -38,9 +38,9 @@ public class GroupedReduceInvokable<IN> extends 
StreamReduceInvokable<IN> {
 
        @Override
        protected void reduce() throws Exception {
-               Object key = reuse.getKey(keySelector);
+               Object key = nextRecord.getKey(keySelector);
                currentValue = values.get(key);
-               nextValue = reuse.getObject();
+               nextValue = nextRecord.getObject();
                if (currentValue != null) {
                        callUserFunctionAndLogException();
                        values.put(key, reduced);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
index ae16be0..33348e4 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
@@ -37,8 +37,6 @@ import 
org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This invokable allows windowing based on {@link TriggerPolicy} and
@@ -80,8 +78,6 @@ public class GroupedWindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
         */
        private static final long serialVersionUID = -3469545957144404137L;
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(GroupedWindowInvokable.class);
-
        private KeySelector<IN, ?> keySelector;
        private Configuration parameters;
        private LinkedList<ActiveTriggerPolicy<IN>> 
activeCentralTriggerPolicies;
@@ -226,23 +222,23 @@ public class GroupedWindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
        }
 
        @Override
-       protected void immutableInvoke() throws Exception {
+       public void invoke() throws Exception {
                // Prevent empty data streams
-               if ((reuse = recordIterator.next(reuse)) == null) {
+               if (readNext() == null) {
                        throw new RuntimeException("DataStream must not be 
empty");
                }
 
                // Continuously run
-               while (reuse != null) {
-                       WindowInvokable<IN, OUT> groupInvokable = 
windowingGroups.get(keySelector.getKey(reuse
-                                       .getObject()));
+               while (nextRecord != null) {
+                       WindowInvokable<IN, OUT> groupInvokable = 
windowingGroups.get(keySelector
+                                       .getKey(nextRecord.getObject()));
                        if (groupInvokable == null) {
-                               groupInvokable = makeNewGroup(reuse);
+                               groupInvokable = makeNewGroup(nextRecord);
                        }
 
                        // Run the precalls for central active triggers
                        for (ActiveTriggerPolicy<IN> trigger : 
activeCentralTriggerPolicies) {
-                               Object[] result = 
trigger.preNotifyTrigger(reuse.getObject());
+                               Object[] result = 
trigger.preNotifyTrigger(nextRecord.getObject());
                                for (Object in : result) {
 
                                        // If central eviction is used, handle 
it here
@@ -260,7 +256,7 @@ public class GroupedWindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
 
                        // Process non-active central triggers
                        for (TriggerPolicy<IN> triggerPolicy : 
centralTriggerPolicies) {
-                               if 
(triggerPolicy.notifyTrigger(reuse.getObject())) {
+                               if 
(triggerPolicy.notifyTrigger(nextRecord.getObject())) {
                                        
currentTriggerPolicies.add(triggerPolicy);
                                }
                        }
@@ -268,12 +264,12 @@ public class GroupedWindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
                        if (currentTriggerPolicies.isEmpty()) {
 
                                // only add the element to its group
-                               
groupInvokable.processRealElement(reuse.getObject());
+                               
groupInvokable.processRealElement(nextRecord.getObject());
                                checkForEmptyGroupBuffer(groupInvokable);
 
                                // If central eviction is used, handle it here
                                if (!centralEvictionPolicies.isEmpty()) {
-                                       
evictElements(centralEviction(reuse.getObject(), false));
+                                       
evictElements(centralEviction(nextRecord.getObject(), false));
                                        
deleteOrderForCentralEviction.add(groupInvokable);
                                }
 
@@ -283,20 +279,21 @@ public class GroupedWindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
                                for (WindowInvokable<IN, OUT> group : 
windowingGroups.values()) {
                                        if (group == groupInvokable) {
                                                // process real with 
initialized policies
-                                               
group.processRealElement(reuse.getObject(), currentTriggerPolicies);
+                                               
group.processRealElement(nextRecord.getObject(), currentTriggerPolicies);
                                        } else {
                                                // process like a fake but also 
initialized with
                                                // policies
-                                               
group.externalTriggerFakeElement(reuse.getObject(), currentTriggerPolicies);
+                                               
group.externalTriggerFakeElement(nextRecord.getObject(),
+                                                               
currentTriggerPolicies);
                                        }
-                                       
-                                       //remove group in case it has an empty 
buffer
-                                       //checkForEmptyGroupBuffer(group);
+
+                                       // remove group in case it has an empty 
buffer
+                                       // checkForEmptyGroupBuffer(group);
                                }
 
                                // If central eviction is used, handle it here
                                if (!centralEvictionPolicies.isEmpty()) {
-                                       
evictElements(centralEviction(reuse.getObject(), true));
+                                       
evictElements(centralEviction(nextRecord.getObject(), true));
                                        
deleteOrderForCentralEviction.add(groupInvokable);
                                }
                        }
@@ -304,9 +301,8 @@ public class GroupedWindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
                        // clear current trigger list
                        currentTriggerPolicies.clear();
 
-                       // Recreate the reuse-StremRecord object and load next 
StreamRecord
-                       resetReuse();
-                       reuse = recordIterator.next(reuse);
+                       // read next record
+                       readNext();
                }
 
                // Stop all remaining threads from policies
@@ -358,7 +354,7 @@ public class GroupedWindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
                                        clonedDistributedEvictionPolicies);
                }
 
-               groupInvokable.initialize(collector, recordIterator, 
inSerializer, isMutable);
+               groupInvokable.setup(taskContext);
                groupInvokable.open(this.parameters);
                windowingGroups.put(keySelector.getKey(element.getObject()), 
groupInvokable);
 
@@ -366,21 +362,6 @@ public class GroupedWindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
        }
 
        @Override
-       protected void mutableInvoke() throws Exception {
-               if (LOG.isInfoEnabled()) {
-                       LOG.info("There is currently no mutable implementation 
of this operator. Immutable version is used.");
-               }
-               immutableInvoke();
-       }
-
-       @Override
-       protected void callUserFunction() throws Exception {
-               // This method gets never called directly. The user function 
calls are
-               // all delegated to the invokable instanced which 
handle/represent the
-               // groups.
-       }
-
-       @Override
        public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                this.parameters = parameters;
@@ -456,12 +437,13 @@ public class GroupedWindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
         *            buffer.
         */
        private void evictElements(int numToEvict) {
-               HashSet<WindowInvokable<IN, OUT>> usedGroups=new 
HashSet<WindowInvokable<IN,OUT>>();
+               HashSet<WindowInvokable<IN, OUT>> usedGroups = new 
HashSet<WindowInvokable<IN, OUT>>();
                for (; numToEvict > 0; numToEvict--) {
-                       WindowInvokable<IN, OUT> 
currentGroup=deleteOrderForCentralEviction.getFirst();
-                       //Do the eviction
+                       WindowInvokable<IN, OUT> currentGroup = 
deleteOrderForCentralEviction.getFirst();
+                       // Do the eviction
                        currentGroup.evictFirst();
-                       //Remember groups which possibly have an empty buffer 
after the eviction
+                       // Remember groups which possibly have an empty buffer 
after the
+                       // eviction
                        usedGroups.add(currentGroup);
                        try {
                                deleteOrderForCentralEviction.removeFirst();
@@ -471,13 +453,13 @@ public class GroupedWindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
                        }
 
                }
-               
-               //Remove groups with empty buffer
-               for (WindowInvokable<IN, OUT> group:usedGroups){
+
+               // Remove groups with empty buffer
+               for (WindowInvokable<IN, OUT> group : usedGroups) {
                        checkForEmptyGroupBuffer(group);
                }
        }
-       
+
        /**
         * Checks if the element buffer of a given windowing group is empty. If 
so,
         * the group will be deleted.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 4feb4f3..6be96ec 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -31,22 +31,14 @@ public class MapInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
        }
 
        @Override
-       protected void immutableInvoke() throws Exception {
-               while ((reuse = recordIterator.next(reuse)) != null) {
-                       callUserFunctionAndLogException();
-                       resetReuse();
-               }
-       }
-
-       @Override
-       protected void mutableInvoke() throws Exception {
-               while ((reuse = recordIterator.next(reuse)) != null) {
+       public void invoke() throws Exception {
+               while (readNext() != null) {
                        callUserFunctionAndLogException();
                }
        }
 
        @Override
        protected void callUserFunction() throws Exception {
-               collector.collect(mapper.map(reuse.getObject()));
+               collector.collect(mapper.map(nextRecord.getObject()));
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index 4666a85..c9d9e5a 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -39,13 +39,8 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends 
StreamInvokable<IN,
        }
 
        @Override
-       protected void immutableInvoke() throws Exception {
-               mutableInvoke();
-       }
-
-       @Override
-       protected void mutableInvoke() throws Exception {
-               while ((reuse = recordIterator.next(reuse)) != null) {
+       public void invoke() throws Exception {
+               while (readNext() != null) {
                        callUserFunctionAndLogException();
                }
        }
@@ -53,7 +48,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends 
StreamInvokable<IN,
        @Override
        protected void callUserFunction() throws Exception {
                for (int i = 0; i < this.numFields; i++) {
-                       outTuple.setField(reuse.getField(fields[i]), i);
+                       outTuple.setField(nextRecord.getField(fields[i]), i);
                }
                collector.collect(outTuple);
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index d327c76..4bb78b8 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -34,22 +34,14 @@ public class StreamReduceInvokable<IN> extends 
StreamInvokable<IN, IN> {
        }
 
        @Override
-       protected void immutableInvoke() throws Exception {
-               while ((reuse = recordIterator.next(reuse)) != null) {
-                       reduce();
-                       resetReuse();
-               }
-       }
-
-       @Override
-       protected void mutableInvoke() throws Exception {
-               while ((reuse = recordIterator.next(reuse)) != null) {
+       public void invoke() throws Exception {
+               while (readNext() != null) {
                        reduce();
                }
        }
 
        protected void reduce() throws Exception {
-               nextValue = reuse.getObject();
+               nextValue = nextRecord.getObject();
                callUserFunctionAndLogException();
 
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
index 0e740bb..ea891c9 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
@@ -29,8 +29,6 @@ import 
org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
 import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, 
OUT> {
 
@@ -39,8 +37,6 @@ public abstract class WindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT>
         */
        private static final long serialVersionUID = -8038984294071650730L;
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(WindowInvokable.class);
-
        private LinkedList<TriggerPolicy<IN>> triggerPolicies;
        private LinkedList<EvictionPolicy<IN>> evictionPolicies;
        private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
@@ -120,20 +116,19 @@ public abstract class WindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT>
        }
 
        @Override
-       protected void immutableInvoke() throws Exception {
+       public void invoke() throws Exception {
 
                // Prevent empty data streams
-               if ((reuse = recordIterator.next(reuse)) == null) {
+               if (readNext() == null) {
                        throw new RuntimeException("DataStream must not be 
empty");
                }
 
                // Continuously run
-               while (reuse != null) {
-                       processRealElement(reuse.getObject());
+               while (nextRecord != null) {
+                       processRealElement(nextRecord.getObject());
 
-                       // Recreate the reuse-StremRecord object and load next 
StreamRecord
-                       resetReuse();
-                       reuse = recordIterator.next(reuse);
+                       // Load next StreamRecord
+                       readNext();
                }
 
                // Stop all remaining threads from policies
@@ -146,14 +141,6 @@ public abstract class WindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT>
 
        }
 
-       @Override
-       protected void mutableInvoke() throws Exception {
-               if (LOG.isInfoEnabled()) {
-                       LOG.info("There is currently no mutable implementation 
of this operator. Immutable version is used.");
-               }
-               immutableInvoke();
-       }
-
        /**
         * This method gets called in case of an grouped windowing in case 
central
         * trigger occurred and the arriving element causing the trigger is not 
part
@@ -363,10 +350,10 @@ public abstract class WindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT>
         * 
         * @return true in case the buffer is empty otherwise false.
         */
-       protected boolean isBufferEmpty(){
+       protected boolean isBufferEmpty() {
                return buffer.isEmpty();
        }
-       
+
        /**
         * This method does the final reduce at the end of the stream and emits 
the
         * result.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
index edf5a8f..4ed49fd 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
@@ -63,7 +63,7 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends 
CoInvokable<IN1, IN2,
        }
 
        @Override
-       public void immutableInvoke() throws Exception {
+       public void invoke() throws Exception {
                while (true) {
                        int next = recordIterator.next(reuse1, reuse2);
                        if (next == 0) {
@@ -100,13 +100,6 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends 
CoInvokable<IN1, IN2,
                return batch2;
        }
 
-       @Override
-       // TODO: implement mutableInvoke for reduce
-       protected void mutableInvoke() throws Exception {
-               System.out.println("Immutable setting is used");
-               immutableInvoke();
-       }
-
        protected void reduce1(StreamBatch<IN1> batch) {
                this.currentBatch1 = batch;
                callUserFunctionAndLogException1();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 25ed62c..604873e 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
 import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,19 +45,18 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends 
StreamInvokable<IN1, OU
        protected TypeSerializer<IN1> serializer1;
        protected TypeSerializer<IN2> serializer2;
 
-       public void initialize(Collector<OUT> collector,
-                       CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> 
recordIterator,
-                       StreamRecordSerializer<IN1> serializer1, 
StreamRecordSerializer<IN2> serializer2,
-                       boolean isMutable) {
-               this.collector = collector;
+       @Override
+       public void setup(StreamTaskContext<OUT> taskContext) {
+               this.collector = taskContext.getOutputCollector();
+
+               this.recordIterator = taskContext.getCoReader();
+
+               this.srSerializer1 = taskContext.getInputSerializer(0);
+               this.srSerializer2 = taskContext.getInputSerializer(1);
 
-               this.recordIterator = recordIterator;
-               this.reuse1 = serializer1.createInstance();
-               this.reuse2 = serializer2.createInstance();
+               this.reuse1 = srSerializer1.createInstance();
+               this.reuse2 = srSerializer2.createInstance();
 
-               this.srSerializer1 = serializer1;
-               this.srSerializer2 = serializer2;
-               this.isMutable = isMutable;
                this.serializer1 = srSerializer1.getObjectSerializer();
                this.serializer2 = srSerializer2.getObjectSerializer();
        }
@@ -76,7 +75,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends 
StreamInvokable<IN1, OU
        }
 
        @Override
-       protected void immutableInvoke() throws Exception {
+       public void invoke() throws Exception {
                while (true) {
                        int next = recordIterator.next(reuse1, reuse2);
                        if (next == 0) {
@@ -93,22 +92,6 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends 
StreamInvokable<IN1, OU
                }
        }
 
-       @Override
-       protected void mutableInvoke() throws Exception {
-               while (true) {
-                       int next = recordIterator.next(reuse1, reuse2);
-                       if (next == 0) {
-                               break;
-                       } else if (next == 1) {
-                               initialize1();
-                               handleStream1();
-                       } else {
-                               initialize2();
-                               handleStream2();
-                       }
-               }
-       }
-
        protected abstract void handleStream1() throws Exception;
 
        protected abstract void handleStream2() throws Exception;
@@ -147,8 +130,4 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends 
StreamInvokable<IN1, OU
                }
        }
 
-       @Override
-       protected void callUserFunction() throws Exception {
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
index be3f578..7df5668 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
@@ -59,11 +59,6 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends 
CoInvokable<IN1, IN2, OUT>
        }
 
        @Override
-       protected void mutableInvoke() throws Exception {
-               throw new RuntimeException("Reducing mutable sliding batch is 
not supported.");
-       }
-
-       @Override
        protected void handleStream1() throws Exception {
                window.addToBuffer1(reuse1.getObject());
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index 0058c66..9321bc7 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -30,8 +30,6 @@ import org.apache.flink.util.MutableObjectIterator;
 
 public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 
-       private OutputHandler<OUT> outputHandler;
-
        protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
        protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
 
@@ -68,8 +66,7 @@ public class CoStreamVertex<IN1, IN2, OUT> extends 
StreamVertex<IN1, OUT> {
        @Override
        protected void setInvokable() {
                userInvokable = configuration.getUserInvokable(userClassLoader);
-               userInvokable.initialize(outputHandler.getCollector(), coIter, 
inputDeserializer1,
-                               inputDeserializer2, isMutable);
+               userInvokable.setup(this);
        }
 
        protected void setConfigInputs() throws StreamVertexException {
@@ -105,4 +102,36 @@ public class CoStreamVertex<IN1, IN2, OUT> extends 
StreamVertex<IN1, OUT> {
                outputHandler.invokeUserFunction("CO-TASK", userInvokable);
        }
 
+       @SuppressWarnings("unchecked")
+       @Override
+       public <X> MutableObjectIterator<X> getInput(int index) {
+               switch (index) {
+               case 0:
+                       return (MutableObjectIterator<X>) inputIter1;
+               case 1:
+                       return (MutableObjectIterator<X>) inputIter2;
+               default:
+                       throw new IllegalArgumentException("CoStreamVertex has 
only 2 inputs");
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
+               switch (index) {
+               case 0:
+                       return (StreamRecordSerializer<X>) inputDeserializer1;
+               case 1:
+                       return (StreamRecordSerializer<X>) inputDeserializer2;
+               default:
+                       throw new IllegalArgumentException("CoStreamVertex has 
only 2 inputs");
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public <X, Y> CoReaderIterator<X, Y> getCoReader() {
+               return (CoReaderIterator<X, Y>) coIter;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
new file mode 100644
index 0000000..7fbab3b
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.io.CoReaderIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public interface StreamTaskContext<OUT> {
+
+       StreamConfig getConfig();
+
+       ClassLoader getUserCodeClassLoader();
+
+       <X> MutableObjectIterator<X> getInput(int index);
+
+       <X> StreamRecordSerializer<X> getInputSerializer(int index);
+
+       Collector<OUT> getOutputCollector();
+
+       <X, Y> CoReaderIterator<X, Y> getCoReader();
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 13e6c9f..7504efd 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -23,9 +23,13 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.io.CoReaderIterator;
 import org.apache.flink.streaming.state.OperatorState;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
 
-public class StreamVertex<IN, OUT> extends AbstractInvokable {
+public class StreamVertex<IN, OUT> extends AbstractInvokable implements 
StreamTaskContext<OUT> {
 
        private static int numTasks;
 
@@ -33,12 +37,11 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable {
        protected int instanceID;
        protected String name;
        private static int numVertices = 0;
-       protected boolean isMutable;
        protected Object function;
        protected String functionName;
 
        private InputHandler<IN> inputHandler;
-       private OutputHandler<OUT> outputHandler;
+       protected OutputHandler<OUT> outputHandler;
        private StreamInvokable<IN, OUT> userInvokable;
 
        private StreamingRuntimeContext context;
@@ -68,7 +71,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
                this.userClassLoader = getUserCodeClassLoader();
                this.configuration = new StreamConfig(getTaskConfiguration());
                this.name = configuration.getVertexName();
-               this.isMutable = configuration.getMutability();
                this.functionName = configuration.getFunctionName();
                this.function = configuration.getFunction(userClassLoader);
                this.states = configuration.getOperatorStates(userClassLoader);
@@ -89,8 +91,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
 
        protected void setInvokable() {
                userInvokable = configuration.getUserInvokable(userClassLoader);
-               userInvokable.initialize(outputHandler.getCollector(), 
inputHandler.getInputIter(),
-                               inputHandler.getInputSerializer(), isMutable);
+               userInvokable.setup(this);
        }
 
        public String getName() {
@@ -111,4 +112,39 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable {
        public void invoke() throws Exception {
                outputHandler.invokeUserFunction("TASK", userInvokable);
        }
+
+       @Override
+       public StreamConfig getConfig() {
+               return configuration;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public <X> MutableObjectIterator<X> getInput(int index) {
+               if (index == 0) {
+                       return (MutableObjectIterator<X>) 
inputHandler.getInputIter();
+               } else {
+                       throw new IllegalArgumentException("There is only 1 
input");
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
+               if (index == 0) {
+                       return (StreamRecordSerializer<X>) 
inputHandler.getInputSerializer();
+               } else {
+                       throw new IllegalArgumentException("There is only 1 
input");
+               }
+       }
+
+       @Override
+       public Collector<OUT> getOutputCollector() {
+               return outputHandler.getCollector();
+       }
+
+       @Override
+       public <X, Y> CoReaderIterator<X, Y> getCoReader() {
+               throw new IllegalArgumentException("CoReader not available");
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 0fbf72a..9376166 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
 import 
org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
@@ -102,24 +102,24 @@ public class AggregationFunctionTest {
                                .getAggregator(1, type1, AggregationType.MAX);
                ReduceFunction<Integer> maxFunction0 = 
ComparableAggregator.getAggregator(0,
                                type2, AggregationType.MAX);
-               List<Tuple2<Integer, Integer>> sumList = 
MockInvokable.createAndExecute(
+               List<Tuple2<Integer, Integer>> sumList = 
MockContext.createAndExecute(
                                new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(sumFunction), getInputList());
 
-               List<Tuple2<Integer, Integer>> minList = 
MockInvokable.createAndExecute(
+               List<Tuple2<Integer, Integer>> minList = 
MockContext.createAndExecute(
                                new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(minFunction), getInputList());
 
-               List<Tuple2<Integer, Integer>> maxList = 
MockInvokable.createAndExecute(
+               List<Tuple2<Integer, Integer>> maxList = 
MockContext.createAndExecute(
                                new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(maxFunction), getInputList());
 
-               List<Tuple2<Integer, Integer>> groupedSumList = 
MockInvokable.createAndExecute(
+               List<Tuple2<Integer, Integer>> groupedSumList = 
MockContext.createAndExecute(
                                new GroupedReduceInvokable<Tuple2<Integer, 
Integer>>(sumFunction,
                                                new 
TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
 
-               List<Tuple2<Integer, Integer>> groupedMinList = 
MockInvokable.createAndExecute(
+               List<Tuple2<Integer, Integer>> groupedMinList = 
MockContext.createAndExecute(
                                new GroupedReduceInvokable<Tuple2<Integer, 
Integer>>(minFunction,
                                                new 
TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
 
-               List<Tuple2<Integer, Integer>> groupedMaxList = 
MockInvokable.createAndExecute(
+               List<Tuple2<Integer, Integer>> groupedMaxList = 
MockContext.createAndExecute(
                                new GroupedReduceInvokable<Tuple2<Integer, 
Integer>>(maxFunction,
                                                new 
TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
 
@@ -129,11 +129,11 @@ public class AggregationFunctionTest {
                assertEquals(expectedGroupSumList, groupedSumList);
                assertEquals(expectedGroupMinList, groupedMinList);
                assertEquals(expectedGroupMaxList, groupedMaxList);
-               assertEquals(expectedSumList0, MockInvokable.createAndExecute(
+               assertEquals(expectedSumList0, MockContext.createAndExecute(
                                new 
StreamReduceInvokable<Integer>(sumFunction0), simpleInput));
-               assertEquals(expectedMinList0, MockInvokable.createAndExecute(
+               assertEquals(expectedMinList0, MockContext.createAndExecute(
                                new 
StreamReduceInvokable<Integer>(minFunction0), simpleInput));
-               assertEquals(expectedMaxList0, MockInvokable.createAndExecute(
+               assertEquals(expectedMaxList0, MockContext.createAndExecute(
                                new 
StreamReduceInvokable<Integer>(maxFunction0), simpleInput));
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
@@ -210,16 +210,16 @@ public class AggregationFunctionTest {
                minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
                minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
 
-               assertEquals(maxByFirstExpected, MockInvokable.createAndExecute(
+               assertEquals(maxByFirstExpected, MockContext.createAndExecute(
                                new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(maxByFunctionFirst),
                                getInputList()));
-               assertEquals(maxByLastExpected, MockInvokable.createAndExecute(
+               assertEquals(maxByLastExpected, MockContext.createAndExecute(
                                new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(maxByFunctionLast),
                                getInputList()));
-               assertEquals(minByLastExpected, MockInvokable.createAndExecute(
+               assertEquals(minByLastExpected, MockContext.createAndExecute(
                                new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(minByFunctionLast),
                                getInputList()));
-               assertEquals(minByFirstExpected, MockInvokable.createAndExecute(
+               assertEquals(minByFirstExpected, MockContext.createAndExecute(
                                new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(minByFunctionFirst),
                                getInputList()));
 
@@ -284,16 +284,16 @@ public class AggregationFunctionTest {
                minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
                minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
 
-               assertEquals(maxByFirstExpected, MockInvokable.createAndExecute(
+               assertEquals(maxByFirstExpected, MockContext.createAndExecute(
                                new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(maxByFunctionFirst),
                                getInputList()));
-               assertEquals(maxByLastExpected, MockInvokable.createAndExecute(
+               assertEquals(maxByLastExpected, MockContext.createAndExecute(
                                new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(maxByFunctionLast),
                                getInputList()));
-               assertEquals(minByLastExpected, MockInvokable.createAndExecute(
+               assertEquals(minByLastExpected, MockContext.createAndExecute(
                                new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(minByFunctionLast),
                                getInputList()));
-               assertEquals(minByFirstExpected, MockInvokable.createAndExecute(
+               assertEquals(minByFirstExpected, MockContext.createAndExecute(
                                new StreamReduceInvokable<Tuple2<Integer, 
Integer>>(minByFunctionFirst),
                                getInputList()));
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 4ab1be2..f46bfc5 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -113,7 +113,7 @@ public class DirectedOutputTest {
                source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD));
                source.selectAll().addSink(new ListSink(ALL));
 
-               env.executeTest(128);
+               env.executeTest(32);
                assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), 
outputs.get(EVEN));
                assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), 
outputs.get(ODD_AND_TEN));
                assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 
10L, 11L),

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
index 44b0513..1db286c 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
@@ -25,7 +25,7 @@ import java.util.List;
 
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import 
org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.junit.Test;
 
 public class CoBatchReduceTest {
@@ -84,7 +84,7 @@ public class CoBatchReduceTest {
                expected.add("def");
                expected.add("ghi");
 
-               List<String> result = 
MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
+               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs, inputs2);
 
                Collections.sort(result);
                Collections.sort(expected);
@@ -125,7 +125,7 @@ public class CoBatchReduceTest {
                expected.add("efg");
                expected.add("ghi");
 
-               List<String> result = 
MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
+               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs, inputs2);
 
                Collections.sort(result);
                Collections.sort(expected);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
index 5009496..a91bd0c 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -59,7 +59,7 @@ public class CoFlatMapTest implements Serializable {
 
                List<String> expectedList = Arrays.asList("a", "b", "c", "1", 
"d", "e", "f", "2", "g", "h",
                                "e", "3", "4", "5");
-               List<String> actualList = 
MockCoInvokable.createAndExecute(invokable,
+               List<String> actualList = 
MockCoContext.createAndExecute(invokable,
                                Arrays.asList("abc", "def", "ghe"), 
Arrays.asList(1, 2, 3, 4, 5));
 
                assertEquals(expectedList, actualList);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
index ce01a7d..bc19a89 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
@@ -26,7 +26,7 @@ import java.util.List;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import 
org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
@@ -99,7 +99,7 @@ public class CoGroupedBatchReduceTest {
                                new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new 
TupleKeySelector(0),
                                new TupleKeySelector(0));
 
-               List<String> result = 
MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs1, inputs2);
 
                Collections.sort(result);
                Collections.sort(expected);
@@ -146,7 +146,7 @@ public class CoGroupedBatchReduceTest {
                                new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new 
TupleKeySelector(0),
                                new TupleKeySelector(0));
 
-               List<String> result = 
MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs1, inputs2);
 
                Collections.sort(result);
                Collections.sort(expected);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
index 4570e23..15d42a4 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import 
org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
@@ -77,7 +77,7 @@ public class CoGroupedReduceTest {
                List<String> expected = Arrays.asList("word1", "1", "word2", 
"2", "word1word3", "3", "5",
                                "7");
 
-               List<String> actualList = 
MockCoInvokable.createAndExecute(invokable,
+               List<String> actualList = 
MockCoContext.createAndExecute(invokable,
                                Arrays.asList(word1, word2, word3), 
Arrays.asList(int1, int2, int3, int4, int5));
 
                assertEquals(expected, actualList);
@@ -87,7 +87,7 @@ public class CoGroupedReduceTest {
 
                expected = Arrays.asList("word1", "1", "word2", "2", 
"word2word3", "3", "5", "7");
 
-               actualList = MockCoInvokable.createAndExecute(invokable,
+               actualList = MockCoContext.createAndExecute(invokable,
                                Arrays.asList(word1, word2, word3), 
Arrays.asList(int1, int2, int3, int4, int5));
 
                assertEquals(expected, actualList);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
index f36a7b5..b5a7e8d 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import 
org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
@@ -129,7 +129,7 @@ public class CoGroupedWindowReduceTest {
                                new TupleKeySelector( 0), new 
MyTimeStamp<Tuple2<String, Integer>>(
                                                timestamps1), new 
MyTimeStamp<Tuple2<String, String>>(timestamps2));
 
-               List<String> result = 
MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs1, inputs2);
 
                Collections.sort(result);
                Collections.sort(expected);
@@ -182,7 +182,7 @@ public class CoGroupedWindowReduceTest {
                                new TupleKeySelector( 0), new 
MyTimeStamp<Tuple2<String, Integer>>(
                                                timestamps1), new 
MyTimeStamp<Tuple2<String, String>>(timestamps2));
 
-               List<String> result = 
MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs1, inputs2);
 
                Collections.sort(result);
                Collections.sort(expected);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
index 9c62aec..93d1741 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -25,7 +25,7 @@ import java.util.List;
 
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.junit.Test;
 
 public class CoMapTest implements Serializable {
@@ -50,7 +50,7 @@ public class CoMapTest implements Serializable {
                CoMapInvokable<Double, Integer, String> invokable = new 
CoMapInvokable<Double, Integer, String>(new MyCoMap());
 
                List<String> expectedList = Arrays.asList("1.1", "1", "1.2", 
"2", "1.3", "3", "1.4", "1.5");
-               List<String> actualList = 
MockCoInvokable.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 
1.5), Arrays.asList(1, 2, 3));
+               List<String> actualList = 
MockCoContext.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 
1.5), Arrays.asList(1, 2, 3));
                
                assertEquals(expectedList, actualList);
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
index 996320a..3343ba0 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
@@ -24,7 +24,7 @@ import java.util.List;
 
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.junit.Test;
 
 public class CoStreamReduceTest {
@@ -62,7 +62,7 @@ public class CoStreamReduceTest {
                                new MyCoReduceFunction());
 
                List<Integer> expected1 = Arrays.asList(1, 9, 2, 99, 6, 998, 
24);
-               List<Integer> result = 
MockCoInvokable.createAndExecute(coReduce,
+               List<Integer> result = MockCoContext.createAndExecute(coReduce,
                                Arrays.asList(1, 2, 3, 4), Arrays.asList("9", 
"9", "8"));
 
                assertEquals(expected1, result);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
index 4604b27..90ad483 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
@@ -28,7 +28,7 @@ import java.util.List;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import 
org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.junit.Test;
 
 public class CoWindowReduceTest {
@@ -114,7 +114,7 @@ public class CoWindowReduceTest {
                expected.add("abcde");
                expected.add("fghi");
 
-               List<String> result = 
MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
+               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs, inputs2);
 
                Collections.sort(result);
                Collections.sort(expected);
@@ -160,7 +160,7 @@ public class CoWindowReduceTest {
                expected.add("fgh");
                expected.add("hi");
 
-               List<String> result = 
MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
+               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs, inputs2);
 
                Collections.sort(result);
                Collections.sort(expected);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
index ebdd963..c6d446a 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.function.co.CoWindowFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -148,7 +148,7 @@ public class CoWindowTest {
                expected1.add(0);
                expected1.add(1);
 
-               List<Integer> actual1 = 
MockCoInvokable.createAndExecute(invokable1, input11, input12);
+               List<Integer> actual1 = 
MockCoContext.createAndExecute(invokable1, input11, input12);
                assertEquals(expected1, actual1);
 
                CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, 
Integer>, Integer> invokable2 = new CoWindowInvokable<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>, Integer>(
@@ -183,7 +183,7 @@ public class CoWindowTest {
                expected2.add(8);
                expected2.add(7);
 
-               List<Integer> actual2 = 
MockCoInvokable.createAndExecute(invokable2, input21, input22);
+               List<Integer> actual2 = 
MockCoContext.createAndExecute(invokable2, input21, input22);
                assertEquals(expected2, actual2);
        }
 }

Reply via email to