[scala] [streaming] Base functionality added for streaming scala api

Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/34353f66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/34353f66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/34353f66

Branch: refs/heads/master
Commit: 34353f6658e9a4dd50ad860e17eee94804b76ccb
Parents: 87d699d
Author: Gyula Fora <[email protected]>
Authored: Thu Dec 11 15:22:03 2014 +0100
Committer: Gyula Fora <[email protected]>
Committed: Fri Jan 2 18:34:38 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    |  37 ++---
 .../flink/streaming/api/StreamConfig.java       |  16 --
 .../api/datastream/ConnectedDataStream.java     |  47 +++---
 .../streaming/api/datastream/DataStream.java    |  70 ++++----
 .../api/datastream/GroupedDataStream.java       |  10 +-
 .../api/datastream/StreamProjection.java        | 161 ++++++++-----------
 .../api/datastream/WindowedDataStream.java      |  14 +-
 .../environment/StreamExecutionEnvironment.java |  37 ++---
 .../function/source/FromElementsFunction.java   |  90 ++++++-----
 .../api/streamvertex/StreamVertex.java          |   3 +-
 flink-scala/pom.xml                             |   6 +
 .../flink/api/scala/streaming/DataStream.scala  |  76 +++++++++
 .../streaming/StreamExecutionEnvironment.scala  | 150 +++++++++++++++++
 13 files changed, 425 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index d66e388..f358de9 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -72,7 +72,6 @@ public class JobGraphBuilder {
        private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
        private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
        private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
-       private Map<String, byte[]> serializedFunctions;
        private Map<String, byte[]> outputSelectors;
        private Map<String, Class<? extends AbstractInvokable>> vertexClasses;
        private Map<String, Integer> iterationIds;
@@ -104,7 +103,6 @@ public class JobGraphBuilder {
                typeSerializersIn2 = new HashMap<String, 
StreamRecordSerializer<?>>();
                typeSerializersOut1 = new HashMap<String, 
StreamRecordSerializer<?>>();
                typeSerializersOut2 = new HashMap<String, 
StreamRecordSerializer<?>>();
-               serializedFunctions = new HashMap<String, byte[]>();
                outputSelectors = new HashMap<String, byte[]>();
                vertexClasses = new HashMap<String, Class<? extends 
AbstractInvokable>>();
                iterationIds = new HashMap<String, Integer>();
@@ -133,18 +131,14 @@ public class JobGraphBuilder {
         *            Output type for serialization
         * @param operatorName
         *            Operator type
-        * @param serializedFunction
-        *            Serialized udf
         * @param parallelism
         *            Number of parallel instances created
         */
        public <IN, OUT> void addStreamVertex(String vertexName,
                        StreamInvokable<IN, OUT> invokableObject, 
TypeInformation<IN> inTypeInfo,
-                       TypeInformation<OUT> outTypeInfo, String operatorName, 
byte[] serializedFunction,
-                       int parallelism) {
+                       TypeInformation<OUT> outTypeInfo, String operatorName, 
int parallelism) {
 
-               addVertex(vertexName, StreamVertex.class, invokableObject, 
operatorName,
-                               serializedFunction, parallelism);
+               addVertex(vertexName, StreamVertex.class, invokableObject, 
operatorName, parallelism);
 
                StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? 
new StreamRecordSerializer<IN>(
                                inTypeInfo) : null;
@@ -171,8 +165,6 @@ public class JobGraphBuilder {
         *            Output type for serialization
         * @param operatorName
         *            Operator type
-        * @param serializedFunction
-        *            Serialized udf
         * @param parallelism
         *            Number of parallel instances created
         */
@@ -185,7 +177,7 @@ public class JobGraphBuilder {
                                function);
 
                addStreamVertex(vertexName, invokableObject, inTypeInfo, 
outTypeInfo, operatorName,
-                               serializedFunction, parallelism);
+                               parallelism);
        }
 
        /**
@@ -206,7 +198,7 @@ public class JobGraphBuilder {
        public void addIterationHead(String vertexName, String iterationHead, 
Integer iterationID,
                        int parallelism, long waitTime) {
 
-               addVertex(vertexName, StreamIterationHead.class, null, null, 
null, parallelism);
+               addVertex(vertexName, StreamIterationHead.class, null, null, 
parallelism);
 
                iterationIds.put(vertexName, iterationID);
                iterationIDtoHeadName.put(iterationID, vertexName);
@@ -247,7 +239,7 @@ public class JobGraphBuilder {
                        throw new RuntimeException("Buffer timeout 0 at 
iteration tail is not supported.");
                }
 
-               addVertex(vertexName, StreamIterationTail.class, null, null, 
null, parallelism);
+               addVertex(vertexName, StreamIterationTail.class, null, null, 
parallelism);
 
                iterationIds.put(vertexName, iterationID);
                iterationIDtoTailName.put(iterationID, vertexName);
@@ -264,10 +256,9 @@ public class JobGraphBuilder {
        public <IN1, IN2, OUT> void addCoTask(String vertexName,
                        CoInvokable<IN1, IN2, OUT> taskInvokableObject, 
TypeInformation<IN1> in1TypeInfo,
                        TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> 
outTypeInfo,
-                       String operatorName, byte[] serializedFunction, int 
parallelism) {
+                       String operatorName, int parallelism) {
 
-               addVertex(vertexName, CoStreamVertex.class, 
taskInvokableObject, operatorName,
-                               serializedFunction, parallelism);
+               addVertex(vertexName, CoStreamVertex.class, 
taskInvokableObject, operatorName, parallelism);
 
                addTypeSerializers(vertexName, new 
StreamRecordSerializer<IN1>(in1TypeInfo),
                                new StreamRecordSerializer<IN2>(in2TypeInfo), 
new StreamRecordSerializer<OUT>(
@@ -289,20 +280,16 @@ public class JobGraphBuilder {
         *            The user defined invokable object
         * @param operatorName
         *            Type of the user defined operator
-        * @param serializedFunction
-        *            Serialized operator
         * @param parallelism
         *            Number of parallel instances created
         */
        private void addVertex(String vertexName, Class<? extends 
AbstractInvokable> vertexClass,
-                       StreamInvokable<?, ?> invokableObject, String 
operatorName, byte[] serializedFunction,
-                       int parallelism) {
+                       StreamInvokable<?, ?> invokableObject, String 
operatorName, int parallelism) {
 
                vertexClasses.put(vertexName, vertexClass);
                setParallelism(vertexName, parallelism);
                invokableObjects.put(vertexName, invokableObject);
                operatorNames.put(vertexName, operatorName);
-               serializedFunctions.put(vertexName, serializedFunction);
                outEdgeList.put(vertexName, new ArrayList<String>());
                outEdgeType.put(vertexName, new ArrayList<Integer>());
                outEdgeNames.put(vertexName, new ArrayList<List<String>>());
@@ -333,8 +320,6 @@ public class JobGraphBuilder {
                // Get vertex attributes
                Class<? extends AbstractInvokable> vertexClass = 
vertexClasses.get(vertexName);
                StreamInvokable<?, ?> invokableObject = 
invokableObjects.get(vertexName);
-               String operatorName = operatorNames.get(vertexName);
-               byte[] serializedFunction = serializedFunctions.get(vertexName);
                int parallelism = vertexParallelism.get(vertexName);
                byte[] outputSelector = outputSelectors.get(vertexName);
                Map<String, OperatorState<?>> state = 
operatorStates.get(vertexName);
@@ -362,7 +347,6 @@ public class JobGraphBuilder {
                // Set vertex config
                config.setUserInvokable(invokableObject);
                config.setVertexName(vertexName);
-               config.setFunction(serializedFunction, operatorName);
                config.setOutputSelector(outputSelector);
                config.setOperatorStates(state);
 
@@ -522,8 +506,8 @@ public class JobGraphBuilder {
        }
 
        /**
-        * Sets udf operator and TypeSerializerWrapper from one vertex to 
another,
-        * used with some sinks.
+        * Sets TypeSerializerWrapper from one vertex to another, used with some
+        * sinks.
         * 
         * @param from
         *            from
@@ -532,7 +516,6 @@ public class JobGraphBuilder {
         */
        public void setBytesFrom(String from, String to) {
                operatorNames.put(to, operatorNames.get(from));
-               serializedFunctions.put(to, serializedFunctions.get(from));
 
                typeSerializersIn1.put(to, typeSerializersOut1.get(from));
                typeSerializersIn2.put(to, typeSerializersOut2.get(from));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 9800b63..8837b85 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -47,7 +47,6 @@ public class StreamConfig {
        private static final String OUTPUT_SELECTOR = "outputSelector";
        private static final String DIRECTED_EMIT = "directedEmit";
        private static final String FUNCTION_NAME = "operatorName";
-       private static final String FUNCTION = "operator";
        private static final String VERTEX_NAME = "vertexName";
        private static final String SERIALIZEDUDF = "serializedudf";
        private static final String USER_FUNCTION = "userfunction";
@@ -173,21 +172,6 @@ public class StreamConfig {
                return config.getString(VERTEX_NAME, null);
        }
 
-       public void setFunction(byte[] serializedFunction, String functionName) 
{
-               if (serializedFunction != null) {
-                       config.setBytes(FUNCTION, serializedFunction);
-                       config.setString(FUNCTION_NAME, functionName);
-               }
-       }
-
-       public Object getFunction(ClassLoader cl) {
-               try {
-                       return 
InstantiationUtil.readObjectFromConfig(this.config, FUNCTION, cl);
-               } catch (Exception e) {
-                       throw new RuntimeException("Cannot deserialize 
invokable object", e);
-               }
-       }
-
        public String getFunctionName() {
                return config.getString(FUNCTION_NAME, "");
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index dcc3dab..c663315 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -17,13 +17,9 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import java.io.Serializable;
 import java.util.List;
 
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
@@ -53,7 +49,7 @@ import org.apache.flink.util.Collector;
  * The ConnectedDataStream represents a stream for two different data types. It
  * can be used to apply transformations like {@link CoMapFunction} on two
  * {@link DataStream}s
- *
+ * 
  * @param <IN1>
  *            Type of the first input data steam.
  * @param <IN2>
@@ -417,8 +413,9 @@ public class ConnectedDataStream<IN1, IN2> {
                TypeInformation<OUT> outTypeInfo = 
TypeExtractor.createTypeInfo(CoMapFunction.class,
                                coMapper.getClass(), 2, null, null);
 
-               return addCoFunction("coMap", clean(coMapper), outTypeInfo,
-                               new CoMapInvokable<IN1, IN2, 
OUT>(clean(coMapper)));
+               return addCoFunction("coMap", outTypeInfo, new 
CoMapInvokable<IN1, IN2, OUT>(
+                               clean(coMapper)));
+
        }
 
        /**
@@ -441,8 +438,8 @@ public class ConnectedDataStream<IN1, IN2> {
                TypeInformation<OUT> outTypeInfo = 
TypeExtractor.createTypeInfo(CoFlatMapFunction.class,
                                coFlatMapper.getClass(), 2, null, null);
 
-               return addCoFunction("coFlatMap", clean(coFlatMapper), 
outTypeInfo,
-                               new CoFlatMapInvokable<IN1, IN2, 
OUT>(clean(coFlatMapper)));
+               return addCoFunction("coFlatMap", outTypeInfo, new 
CoFlatMapInvokable<IN1, IN2, OUT>(
+                               clean(coFlatMapper)));
        }
 
        /**
@@ -466,8 +463,8 @@ public class ConnectedDataStream<IN1, IN2> {
                TypeInformation<OUT> outTypeInfo = 
TypeExtractor.createTypeInfo(CoReduceFunction.class,
                                coReducer.getClass(), 2, null, null);
 
-               return addCoFunction("coReduce", clean(coReducer), outTypeInfo,
-                               getReduceInvokable(clean(coReducer)));
+               return addCoFunction("coReduce", outTypeInfo, 
getReduceInvokable(clean(coReducer)));
+
        }
 
        /**
@@ -531,9 +528,9 @@ public class ConnectedDataStream<IN1, IN2> {
                TypeInformation<OUT> outTypeInfo = 
TypeExtractor.createTypeInfo(CoWindowFunction.class,
                                coWindowFunction.getClass(), 2, null, null);
 
-               return addCoFunction("coWindowReduce", clean(coWindowFunction), 
outTypeInfo,
-                               new CoWindowInvokable<IN1, IN2, 
OUT>(clean(coWindowFunction), windowSize, slideInterval,
-                                               timestamp1, timestamp2));
+               return addCoFunction("coWindowReduce", outTypeInfo, new 
CoWindowInvokable<IN1, IN2, OUT>(
+                               clean(coWindowFunction), windowSize, 
slideInterval, timestamp1, timestamp2));
+
        }
 
        protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
@@ -607,27 +604,21 @@ public class ConnectedDataStream<IN1, IN2> {
                        throw new IllegalArgumentException("Slide interval must 
be positive");
                }
 
-               return addCoFunction("coWindowReduce", clean(coWindowFunction), 
outTypeInfo,
-                               new CoWindowInvokable<IN1, IN2, 
OUT>(clean(coWindowFunction), windowSize, slideInterval,
-                                               timestamp1, timestamp2));
+               return addCoFunction("coWindowReduce", outTypeInfo, new 
CoWindowInvokable<IN1, IN2, OUT>(
+                               clean(coWindowFunction), windowSize, 
slideInterval, timestamp1, timestamp2));
+
        }
 
-       protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String 
functionName,
-                       final Function function, TypeInformation<OUT> 
outTypeInfo,
-                       CoInvokable<IN1, IN2, OUT> functionInvokable) {
+       public <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String 
functionName,
+                       TypeInformation<OUT> outTypeInfo, CoInvokable<IN1, IN2, 
OUT> functionInvokable) {
 
                @SuppressWarnings({ "unchecked", "rawtypes" })
                SingleOutputStreamOperator<OUT, ?> returnStream = new 
SingleOutputStreamOperator(
                                environment, functionName, outTypeInfo);
 
-               try {
-                       
dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
-                                       getInputType1(), getInputType2(), 
outTypeInfo, functionName,
-                                       
SerializationUtils.serialize((Serializable) function),
-                                       environment.getDegreeOfParallelism());
-               } catch (SerializationException e) {
-                       throw new RuntimeException("Cannot serialize user 
defined function");
-               }
+               dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), 
functionInvokable,
+                               getInputType1(), getInputType2(), outTypeInfo, 
functionName,
+                               environment.getDegreeOfParallelism());
 
                dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
                dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 6e8da0a..04929c1 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -17,15 +17,11 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
@@ -401,8 +397,7 @@ public class DataStream<OUT> {
 
                TypeInformation<R> outType = 
TypeExtractor.getMapReturnTypes(clean(mapper), getType());
 
-               return addFunction("map", clean(mapper), getType(), outType, 
new MapInvokable<OUT, R>(
-                               clean(mapper)));
+               return transform("map", outType, new MapInvokable<OUT, 
R>(clean(mapper)));
        }
 
        /**
@@ -423,10 +418,11 @@ public class DataStream<OUT> {
         */
        public <R> SingleOutputStreamOperator<R, ?> 
flatMap(FlatMapFunction<OUT, R> flatMapper) {
 
-               TypeInformation<R> outType = 
TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType());
+               TypeInformation<R> outType = 
TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
+                               getType());
+
+               return transform("flatMap", outType, new FlatMapInvokable<OUT, 
R>(clean(flatMapper)));
 
-               return addFunction("flatMap", clean(flatMapper), getType(), 
outType,
-                               new FlatMapInvokable<OUT, 
R>(clean(flatMapper)));
        }
 
        /**
@@ -442,8 +438,8 @@ public class DataStream<OUT> {
         */
        public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> 
reducer) {
 
-               return addFunction("reduce", clean(reducer), getType(), 
getType(),
-                               new StreamReduceInvokable<OUT>(clean(reducer)));
+               return transform("reduce", getType(), new 
StreamReduceInvokable<OUT>(clean(reducer)));
+
        }
 
        /**
@@ -461,8 +457,8 @@ public class DataStream<OUT> {
         * @return The filtered DataStream.
         */
        public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> 
filter) {
-               return addFunction("filter", clean(filter), getType(), 
getType(), new FilterInvokable<OUT>(clean(
-                               filter)));
+               return transform("filter", getType(), new 
FilterInvokable<OUT>(clean(filter)));
+
        }
 
        /**
@@ -742,7 +738,7 @@ public class DataStream<OUT> {
        public SingleOutputStreamOperator<Long, ?> count() {
                TypeInformation<Long> outTypeInfo = 
TypeExtractor.getForObject(Long.valueOf(0));
 
-               return addFunction("counter", null, getType(), outTypeInfo, new 
CounterInvokable<OUT>());
+               return transform("counter", outTypeInfo, new 
CounterInvokable<OUT>());
        }
 
        /**
@@ -1120,8 +1116,7 @@ public class DataStream<OUT> {
 
                StreamReduceInvokable<OUT> invokable = new 
StreamReduceInvokable<OUT>(aggregate);
 
-               SingleOutputStreamOperator<OUT, ?> returnStream = 
addFunction("reduce", clean(aggregate),
-                               typeInfo, typeInfo, invokable);
+               SingleOutputStreamOperator<OUT, ?> returnStream = 
transform("reduce", typeInfo, invokable);
 
                return returnStream;
        }
@@ -1137,34 +1132,28 @@ public class DataStream<OUT> {
        }
 
        /**
-        * Internal function for passing the user defined functions to the 
JobGraph
-        * of the job.
-        * 
-        * @param functionName
-        *            name of the function
-        * @param function
-        *            the user defined function
-        * @param functionInvokable
-        *            the wrapping JobVertex instance
+        * Method for passing user defined invokables along with the type
+        * informations that will transform the DataStream.
+        * 
+        * @param operatorName
+        *            name of the operator, for logging purposes
+        * @param outTypeInfo
+        *            the output type of the operator
+        * @param invokable
+        *            the object containing the transformation logic
         * @param <R>
         *            type of the return stream
         * @return the data stream constructed
         */
-       protected <R> SingleOutputStreamOperator<R, ?> addFunction(String 
functionName,
-                       final Function function, TypeInformation<OUT> 
inTypeInfo,
-                       TypeInformation<R> outTypeInfo, StreamInvokable<OUT, R> 
functionInvokable) {
+       public <R> SingleOutputStreamOperator<R, ?> transform(String 
operatorName,
+                       TypeInformation<R> outTypeInfo, StreamInvokable<OUT, R> 
invokable) {
                DataStream<OUT> inputStream = this.copy();
                @SuppressWarnings({ "unchecked", "rawtypes" })
                SingleOutputStreamOperator<R, ?> returnStream = new 
SingleOutputStreamOperator(environment,
-                               functionName, outTypeInfo);
+                               operatorName, outTypeInfo);
 
-               try {
-                       jobGraphBuilder.addStreamVertex(returnStream.getId(), 
functionInvokable, inTypeInfo,
-                                       outTypeInfo, functionName,
-                                       
SerializationUtils.serialize((Serializable) function), degreeOfParallelism);
-               } catch (SerializationException e) {
-                       throw new RuntimeException("Cannot serialize user 
defined function");
-               }
+               jobGraphBuilder.addStreamVertex(returnStream.getId(), 
invokable, getType(), outTypeInfo,
+                               operatorName, degreeOfParallelism);
 
                connectGraph(inputStream, returnStream.getId(), 0);
 
@@ -1235,13 +1224,8 @@ public class DataStream<OUT> {
                        SinkFunction<OUT> sinkFunction, TypeInformation<OUT> 
inTypeInfo) {
                DataStreamSink<OUT> returnStream = new 
DataStreamSink<OUT>(environment, "sink", typeInfo);
 
-               try {
-                       jobGraphBuilder.addStreamVertex(returnStream.getId(), 
new SinkInvokable<OUT>(
-                                       clean(sinkFunction)), inTypeInfo, null, 
"sink", SerializationUtils
-                                       .serialize(clean(sinkFunction)), 
degreeOfParallelism);
-               } catch (SerializationException e) {
-                       throw new RuntimeException("Cannot serialize 
SinkFunction");
-               }
+               jobGraphBuilder.addStreamVertex(returnStream.getId(), new 
SinkInvokable<OUT>(
+                               clean(sinkFunction)), inTypeInfo, null, "sink", 
degreeOfParallelism);
 
                inputStream.connectGraph(inputStream.copy(), 
returnStream.getId(), 0);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 2620d3e..a2c0f89 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.streaming.partitioner.StreamPartitioner;
  * partitioned by the given {@link KeySelector}. Operators like {@link 
#reduce},
  * {@link #batchReduce} etc. can be applied on the {@link GroupedDataStream} to
  * get additional functionality by the grouping.
- *
+ * 
  * @param <OUT>
  *            The output type of the {@link GroupedDataStream}.
  */
@@ -62,8 +62,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
         */
        @Override
        public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> 
reducer) {
-               return addFunction("groupReduce", clean(reducer), getType(), 
getType(),
-                               new GroupedReduceInvokable<OUT>(clean(reducer), 
keySelector));
+               return transform("groupReduce", getType(), new 
GroupedReduceInvokable<OUT>(clean(reducer),
+                               keySelector));
        }
 
        /**
@@ -182,8 +182,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> 
{
                GroupedReduceInvokable<OUT> invokable = new 
GroupedReduceInvokable<OUT>(clean(aggregate),
                                keySelector);
 
-               SingleOutputStreamOperator<OUT, ?> returnStream = 
addFunction("groupReduce", clean(aggregate),
-                               typeInfo, typeInfo, invokable);
+               SingleOutputStreamOperator<OUT, ?> returnStream = 
transform("groupReduce", typeInfo,
+                               invokable);
 
                return returnStream;
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index cc5f66e..e71b18c 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -83,8 +83,8 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple1<T0>> outType = 
(TypeInformation<Tuple1<T0>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.addFunction("projection", null, inTypeInfo, 
outType,
-                               new ProjectInvokable<IN, 
Tuple1<T0>>(fieldIndexes, outType));
+               return dataStream.transform("projection", outType, new 
ProjectInvokable<IN, Tuple1<T0>>(
+                               fieldIndexes, outType));
        }
 
        /**
@@ -111,7 +111,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple2<T0, T1>> outType = 
(TypeInformation<Tuple2<T0, T1>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.addFunction("projection", null, inTypeInfo, 
outType,
+               return dataStream.transform("projection", outType,
                                new ProjectInvokable<IN, Tuple2<T0, 
T1>>(fieldIndexes, outType));
        }
 
@@ -141,7 +141,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple3<T0, T1, T2>> outType = 
(TypeInformation<Tuple3<T0, T1, T2>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.addFunction("projection", null, inTypeInfo, 
outType,
+               return dataStream.transform("projection", outType,
                                new ProjectInvokable<IN, Tuple3<T0, T1, 
T2>>(fieldIndexes, outType));
        }
 
@@ -173,7 +173,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple4<T0, T1, T2, T3>> outType = 
(TypeInformation<Tuple4<T0, T1, T2, T3>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.addFunction("projection", null, inTypeInfo, 
outType,
+               return dataStream.transform("projection", outType,
                                new ProjectInvokable<IN, Tuple4<T0, T1, T2, 
T3>>(fieldIndexes, outType));
        }
 
@@ -206,14 +206,14 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple5<T0, T1, T2, T3, T4>> outType = 
(TypeInformation<Tuple5<T0, T1, T2, T3, T4>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.addFunction("projection", null, inTypeInfo, 
outType,
+               return dataStream.transform("projection", outType,
                                new ProjectInvokable<IN, Tuple5<T0, T1, T2, T3, 
T4>>(fieldIndexes, outType));
        }
 
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -227,7 +227,7 @@ public class StreamProjection<IN> {
         * @param type5
         *            The class of field '5' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -243,14 +243,14 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>> outType = 
(TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.addFunction("projection", null, inTypeInfo, 
outType,
+               return dataStream.transform("projection", outType,
                                new ProjectInvokable<IN, Tuple6<T0, T1, T2, T3, 
T4, T5>>(fieldIndexes, outType));
        }
 
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -266,7 +266,7 @@ public class StreamProjection<IN> {
         * @param type6
         *            The class of field '6' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -283,7 +283,7 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>> outType = 
(TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .addFunction("projection", null, inTypeInfo, 
outType,
+                               .transform("projection", outType,
                                                new ProjectInvokable<IN, 
Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes,
                                                                outType));
        }
@@ -291,7 +291,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -309,7 +309,7 @@ public class StreamProjection<IN> {
         * @param type7
         *            The class of field '7' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -325,7 +325,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> outType 
= (TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.addFunction("projection", null, inTypeInfo, 
outType,
+               return dataStream.transform("projection", outType,
                                new ProjectInvokable<IN, Tuple8<T0, T1, T2, T3, 
T4, T5, T6, T7>>(fieldIndexes,
                                                outType));
        }
@@ -333,7 +333,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -353,7 +353,7 @@ public class StreamProjection<IN> {
         * @param type8
         *            The class of field '8' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -369,7 +369,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> 
outType = (TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>) 
extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.addFunction("projection", null, inTypeInfo, 
outType,
+               return dataStream.transform("projection", outType,
                                new ProjectInvokable<IN, Tuple9<T0, T1, T2, T3, 
T4, T5, T6, T7, T8>>(fieldIndexes,
                                                outType));
        }
@@ -377,7 +377,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -399,7 +399,7 @@ public class StreamProjection<IN> {
         * @param type9
         *            The class of field '9' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -415,7 +415,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, 
T9>> outType = (TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, 
T9>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.addFunction("projection", null, inTypeInfo, 
outType,
+               return dataStream.transform("projection", outType,
                                new ProjectInvokable<IN, Tuple10<T0, T1, T2, 
T3, T4, T5, T6, T7, T8, T9>>(
                                                fieldIndexes, outType));
        }
@@ -423,7 +423,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -447,7 +447,7 @@ public class StreamProjection<IN> {
         * @param type10
         *            The class of field '10' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -465,7 +465,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10>> outType = (TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, 
T9, T10>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.addFunction("projection", null, inTypeInfo, 
outType,
+               return dataStream.transform("projection", outType,
                                new ProjectInvokable<IN, Tuple11<T0, T1, T2, 
T3, T4, T5, T6, T7, T8, T9, T10>>(
                                                fieldIndexes, outType));
        }
@@ -473,7 +473,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -499,7 +499,7 @@ public class StreamProjection<IN> {
         * @param type11
         *            The class of field '11' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -518,10 +518,8 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11>> outType = (TypeInformation<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, 
T8, T9, T10, T11>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .addFunction(
+                               .transform(
                                                "projection",
-                                               null,
-                                               inTypeInfo,
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(
                                                                fieldIndexes, 
outType));
@@ -530,7 +528,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -558,7 +556,7 @@ public class StreamProjection<IN> {
         * @param type12
         *            The class of field '12' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -577,10 +575,8 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12>> outType = (TypeInformation<Tuple13<T0, T1, T2, T3, T4, T5, T6, 
T7, T8, T9, T10, T11, T12>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .addFunction(
+                               .transform(
                                                "projection",
-                                               null,
-                                               inTypeInfo,
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(
                                                                fieldIndexes, 
outType));
@@ -589,7 +585,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -619,7 +615,7 @@ public class StreamProjection<IN> {
         * @param type13
         *            The class of field '13' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -638,10 +634,8 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13>> outType = (TypeInformation<Tuple14<T0, T1, T2, T3, T4, T5, 
T6, T7, T8, T9, T10, T11, T12, T13>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .addFunction(
+                               .transform(
                                                "projection",
-                                               null,
-                                               inTypeInfo,
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(
                                                                fieldIndexes, 
outType));
@@ -650,7 +644,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -682,7 +676,7 @@ public class StreamProjection<IN> {
         * @param type14
         *            The class of field '14' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -702,10 +696,8 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14>> outType = (TypeInformation<Tuple15<T0, T1, T2, T3, 
T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .addFunction(
+                               .transform(
                                                "projection",
-                                               null,
-                                               inTypeInfo,
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(
                                                                fieldIndexes, 
outType));
@@ -714,7 +706,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -748,7 +740,7 @@ public class StreamProjection<IN> {
         * @param type15
         *            The class of field '15' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -768,10 +760,8 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15>> outType = (TypeInformation<Tuple16<T0, T1, T2, 
T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .addFunction(
+                               .transform(
                                                "projection",
-                                               null,
-                                               inTypeInfo,
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(
                                                                fieldIndexes, 
outType));
@@ -780,7 +770,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -816,7 +806,7 @@ public class StreamProjection<IN> {
         * @param type16
         *            The class of field '16' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -836,10 +826,8 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16>> outType = (TypeInformation<Tuple17<T0, T1, 
T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>) 
extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .addFunction(
+                               .transform(
                                                "projection",
-                                               null,
-                                               inTypeInfo,
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16>>(
                                                                fieldIndexes, 
outType));
@@ -848,7 +836,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -886,7 +874,7 @@ public class StreamProjection<IN> {
         * @param type17
         *            The class of field '17' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -906,10 +894,8 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17>> outType = (TypeInformation<Tuple18<T0, 
T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>) 
extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .addFunction(
+                               .transform(
                                                "projection",
-                                               null,
-                                               inTypeInfo,
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17>>(
                                                                fieldIndexes, 
outType));
@@ -918,7 +904,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -958,7 +944,7 @@ public class StreamProjection<IN> {
         * @param type18
         *            The class of field '18' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -979,10 +965,8 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18>> outType = 
(TypeInformation<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, 
T13, T14, T15, T16, T17, T18>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .addFunction(
+                               .transform(
                                                "projection",
-                                               null,
-                                               inTypeInfo,
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17, T18>>(
                                                                fieldIndexes, 
outType));
@@ -991,7 +975,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -1033,7 +1017,7 @@ public class StreamProjection<IN> {
         * @param type19
         *            The class of field '19' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -1054,10 +1038,8 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> outType = 
(TypeInformation<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, 
T13, T14, T15, T16, T17, T18, T19>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .addFunction(
+                               .transform(
                                                "projection",
-                                               null,
-                                               inTypeInfo,
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17, T18, T19>>(
                                                                fieldIndexes, 
outType));
@@ -1066,7 +1048,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -1110,7 +1092,7 @@ public class StreamProjection<IN> {
         * @param type20
         *            The class of field '20' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -1132,10 +1114,8 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> outType = 
(TypeInformation<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, 
T13, T14, T15, T16, T17, T18, T19, T20>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .addFunction(
+                               .transform(
                                                "projection",
-                                               null,
-                                               inTypeInfo,
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17, T18, T19, T20>>(
                                                                fieldIndexes, 
outType));
@@ -1144,7 +1124,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -1190,7 +1170,7 @@ public class StreamProjection<IN> {
         * @param type21
         *            The class of field '21' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -1212,10 +1192,8 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> outType = 
(TypeInformation<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, 
T13, T14, T15, T16, T17, T18, T19, T20, T21>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .addFunction(
+                               .transform(
                                                "projection",
-                                               null,
-                                               inTypeInfo,
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17, T18, T19, T20, T21>>(
                                                                fieldIndexes, 
outType));
@@ -1224,7 +1202,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -1272,7 +1250,7 @@ public class StreamProjection<IN> {
         * @param type22
         *            The class of field '22' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -1295,10 +1273,8 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> outType = 
(TypeInformation<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, 
T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .addFunction(
+                               .transform(
                                                "projection",
-                                               null,
-                                               inTypeInfo,
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17, T18, T19, T20, T21, T22>>(
                                                                fieldIndexes, 
outType));
@@ -1307,7 +1283,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -1357,7 +1333,7 @@ public class StreamProjection<IN> {
         * @param type23
         *            The class of field '23' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -1380,10 +1356,8 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> outType 
= (TypeInformation<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, 
T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .addFunction(
+                               .transform(
                                                "projection",
-                                               null,
-                                               inTypeInfo,
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17, T18, T19, T20, T21, T22, T23>>(
                                                                fieldIndexes, 
outType));
@@ -1392,7 +1366,7 @@ public class StreamProjection<IN> {
        /**
         * Projects a {@link Tuple} {@link DataStream} to the previously 
selected
         * fields. Requires the classes of the fields of the resulting Tuples.
-        *
+        * 
         * @param type0
         *            The class of field '0' of the result Tuples.
         * @param type1
@@ -1444,7 +1418,7 @@ public class StreamProjection<IN> {
         * @param type24
         *            The class of field '24' of the result Tuples.
         * @return The projected DataStream.
-        *
+        * 
         * @see Tuple
         * @see DataStream
         */
@@ -1467,10 +1441,9 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> 
outType = (TypeInformation<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, 
T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>) 
extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .addFunction(
+                               .transform(
                                                "projection",
-                                               null,
-                                               inTypeInfo,
+
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17, T18, T19, T20, T21, T22, T23, T24>>(
                                                                fieldIndexes, 
outType));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 788f28d..cb9cd04 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -50,7 +50,7 @@ import 
org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
  * into windows (predefined chunks). User defined function such as
  * {@link #reduce(ReduceFunction)}, {@link #reduceGroup(GroupReduceFunction)} 
or
  * aggregations can be applied to the windows.
- *
+ * 
  * @param <OUT>
  *            The output type of the {@link WindowedDataStream}
  */
@@ -124,8 +124,8 @@ public class WindowedDataStream<OUT> {
                this.userEvicters = windowedDataStream.userEvicters;
                this.allCentral = windowedDataStream.allCentral;
        }
-       
-       public <F> F clean(F f){
+
+       public <F> F clean(F f) {
                return dataStream.clean(f);
        }
 
@@ -231,7 +231,7 @@ public class WindowedDataStream<OUT> {
         * @return The transformed DataStream
         */
        public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> 
reduceFunction) {
-               return dataStream.addFunction("NextGenWindowReduce", 
clean(reduceFunction), getType(), getType(),
+               return dataStream.transform("NextGenWindowReduce", getType(),
                                getReduceInvokable(reduceFunction));
        }
 
@@ -255,7 +255,7 @@ public class WindowedDataStream<OUT> {
                TypeInformation<R> outType = TypeExtractor
                                .getGroupReduceReturnTypes(reduceFunction, 
inType);
 
-               return dataStream.addFunction("NextGenWindowReduce", 
clean(reduceFunction), inType, outType,
+               return dataStream.transform("NextGenWindowReduce", outType,
                                getReduceGroupInvokable(reduceFunction));
        }
 
@@ -457,8 +457,8 @@ public class WindowedDataStream<OUT> {
        private SingleOutputStreamOperator<OUT, ?> 
aggregate(AggregationFunction<OUT> aggregator) {
                StreamInvokable<OUT, OUT> invokable = 
getReduceInvokable(aggregator);
 
-               SingleOutputStreamOperator<OUT, ?> returnStream = 
dataStream.addFunction("windowReduce",
-                               clean(aggregator), getType(), getType(), 
invokable);
+               SingleOutputStreamOperator<OUT, ?> returnStream = 
dataStream.transform("windowReduce",
+                               getType(), invokable);
 
                return returnStream;
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index d26b714..f50ab91 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -22,8 +22,6 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
@@ -264,14 +262,10 @@ public abstract class StreamExecutionEnvironment {
                DataStreamSource<OUT> returnStream = new 
DataStreamSource<OUT>(this, "elements",
                                outTypeInfo);
 
-               try {
-                       SourceFunction<OUT> function = new 
FromElementsFunction<OUT>(data);
-                       jobGraphBuilder.addStreamVertex(returnStream.getId(),
-                                       new SourceInvokable<OUT>(function), 
null, outTypeInfo, "source",
-                                       SerializationUtils.serialize(function), 
1);
-               } catch (SerializationException e) {
-                       throw new RuntimeException("Cannot serialize elements");
-               }
+               SourceFunction<OUT> function = new 
FromElementsFunction<OUT>(data);
+               jobGraphBuilder.addStreamVertex(returnStream.getId(), new 
SourceInvokable<OUT>(function),
+                               null, outTypeInfo, "source", 1);
+
                return returnStream;
        }
 
@@ -300,15 +294,8 @@ public abstract class StreamExecutionEnvironment {
                DataStreamSource<OUT> returnStream = new 
DataStreamSource<OUT>(this, "collection",
                                outTypeInfo);
 
-               try {
-                       SourceFunction<OUT> function = new 
FromElementsFunction<OUT>(data);
-
-                       jobGraphBuilder.addStreamVertex(returnStream.getId(), 
new SourceInvokable<OUT>(
-                                       new FromElementsFunction<OUT>(data)), 
null, outTypeInfo, "source",
-                                       SerializationUtils.serialize(function), 
1);
-               } catch (SerializationException e) {
-                       throw new RuntimeException("Cannot serialize 
collection");
-               }
+               jobGraphBuilder.addStreamVertex(returnStream.getId(), new 
SourceInvokable<OUT>(
+                               new FromElementsFunction<OUT>(data)), null, 
outTypeInfo, "source", 1);
 
                return returnStream;
        }
@@ -317,7 +304,7 @@ public abstract class StreamExecutionEnvironment {
         * Creates a new DataStream that contains the strings received 
infinitely
         * from socket. Received strings are decoded by the system's default
         * character set.
-        *
+        * 
         * @param hostname
         *            The host name which a server socket bind.
         * @param port
@@ -335,7 +322,7 @@ public abstract class StreamExecutionEnvironment {
         * Creates a new DataStream that contains the strings received 
infinitely
         * from socket. Received strings are decoded by the system's default
         * character set, uses '\n' as delimiter.
-        *
+        * 
         * @param hostname
         *            The host name which a server socket bind.
         * @param port
@@ -378,12 +365,8 @@ public abstract class StreamExecutionEnvironment {
 
                DataStreamSource<OUT> returnStream = new 
DataStreamSource<OUT>(this, "source", outTypeInfo);
 
-               try {
-                       jobGraphBuilder.addSourceVertex(returnStream.getId(), 
function, null, outTypeInfo,
-                                       "source", 
SerializationUtils.serialize(function), getDegreeOfParallelism());
-               } catch (SerializationException e) {
-                       throw new RuntimeException("Cannot serialize 
SourceFunction");
-               }
+               jobGraphBuilder.addStreamVertex(returnStream.getId(), new 
SourceInvokable<OUT>(function),
+                               null, outTypeInfo, "source", 1);
 
                return returnStream;
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index cb960dd..8afac75 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -1,45 +1,49 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import org.apache.flink.util.Collector;
-
-public class FromElementsFunction<T> implements SourceFunction<T> {
-       private static final long serialVersionUID = 1L;
-
-       Iterable<T> iterable;
-
-       public FromElementsFunction(T... elements) {
-               this.iterable = Arrays.asList(elements);
-       }
-
-       public FromElementsFunction(Collection<T> elements) {
-               this.iterable = elements;
-       }
-
-       @Override
-       public void invoke(Collector<T> collector) throws Exception {
-               for (T element : iterable) {
-                       collector.collect(element);
-               }
-       }
-
-}
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.util.Collector;
+
+public class FromElementsFunction<T> implements SourceFunction<T> {
+       private static final long serialVersionUID = 1L;
+
+       Iterable<T> iterable;
+
+       public FromElementsFunction(T... elements) {
+               this.iterable = Arrays.asList(elements);
+       }
+
+       public FromElementsFunction(Collection<T> elements) {
+               this.iterable = elements;
+       }
+
+       public FromElementsFunction(Iterable<T> elements) {
+               this.iterable = elements;
+       }
+
+       @Override
+       public void invoke(Collector<T> collector) throws Exception {
+               for (T element : iterable) {
+                       collector.collect(element);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 7504efd..d786d6b 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -37,7 +37,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable 
implements StreamTa
        protected int instanceID;
        protected String name;
        private static int numVertices = 0;
-       protected Object function;
+
        protected String functionName;
 
        private InputHandler<IN> inputHandler;
@@ -72,7 +72,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable 
implements StreamTa
                this.configuration = new StreamConfig(getTaskConfiguration());
                this.name = configuration.getVertexName();
                this.functionName = configuration.getFunctionName();
-               this.function = configuration.getFunction(userClassLoader);
                this.states = configuration.getOperatorStates(userClassLoader);
                this.context = createRuntimeContext(name, this.states);
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index b902655..4ccbb98 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -50,6 +50,12 @@ under the License.
                        <artifactId>flink-compiler</artifactId>
                        <version>${project.version}</version>
                </dependency>
+        
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
                <dependency>
                        <groupId>org.scala-lang</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
new file mode 100644
index 0000000..711ce7c
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import scala.reflect.ClassTag
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.streaming.api.invokable.operator.MapInvokable
+
+class DataStream[OUT](javaStream: JavaStream[OUT]) {
+
+  /* This code is originally from the Apache Spark project. */
+  /**
+   * Clean a closure to make it ready to serialized and send to tasks
+   * (removes unreferenced variables in $outer's, updates REPL variables)
+   * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
+   * check to see if <tt>f</tt> is serializable and throw a 
<tt>SparkException</tt>
+   * if not.
+   *
+   * @param f the closure to clean
+   * @param checkSerializable whether or not to immediately check <tt>f</tt> 
for serializability
+   * @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but 
<tt>f</tt> is not
+   *   serializable
+   */
+  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = 
true): F = {
+    ClosureCleaner.clean(f, checkSerializable)
+    f
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element 
of this DataStream.
+   */
+  def map[R: TypeInformation: ClassTag](fun: OUT => R): DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+    val mapper = new MapFunction[OUT, R] {
+      val cleanFun = clean(fun)
+      def map(in: OUT): R = cleanFun(in)
+    }
+
+    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], 
new MapInvokable[OUT, R](mapper)))
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element 
of this DataStream.
+   */
+  def map[R: TypeInformation: ClassTag](mapper: MapFunction[OUT, R]): 
DataStream[R] = {
+    if (mapper == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+
+    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], 
new MapInvokable[OUT, R](mapper)))
+  }
+
+  def print() = javaStream.print()
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
new file mode 100644
index 0000000..df6c561
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming
+
+import org.apache.flink.streaming.api.environment.{ StreamExecutionEnvironment 
=> JavaEnv }
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.commons.lang.Validate
+import scala.reflect.ClassTag
+import org.apache.flink.streaming.api.datastream.DataStreamSource
+import org.apache.flink.streaming.api.invokable.SourceInvokable
+import org.apache.flink.streaming.api.function.source.FromElementsFunction
+
+class StreamExecutionEnvironment(javaEnv: JavaEnv) {
+
+  /**
+   * Sets the degree of parallelism (DOP) for operations executed through this 
environment.
+   * Setting a DOP of x here will cause all operators (such as join, map, 
reduce) to run with
+   * x parallel instances. This value can be overridden by specific operations 
using
+   * [[DataStream.setParallelism]].
+   */
+  def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
+    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+  }
+
+  /**
+   * Returns the default degree of parallelism for this execution environment. 
Note that this
+   * value can be overridden by individual operations using 
[[DataStream.setParallelism]]
+   */
+  def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
+
+  def generateSequence(from: Long, to: Long): DataStream[java.lang.Long] = new 
DataStream(javaEnv.generateSequence(from, to))
+
+  /**
+   * Creates a new data stream that contains the given elements. The elements 
must all be of the
+   * same type and must be serializable.
+   *
+   * * Note that this operation will result in a non-parallel data source, 
i.e. a data source with
+   * a degree of parallelism of one.
+   */
+  def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
+    val typeInfo = implicitly[TypeInformation[T]]
+    fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
+  }
+
+  /**
+   * Creates a DataStream from the given non-empty [[Seq]]. The elements need 
to be serializable
+   * because the framework may move the elements into the cluster if needed.
+   *
+   * Note that this operation will result in a non-parallel data source, i.e. 
a data source with
+   * a degree of parallelism of one.
+   */
+  def fromCollection[T: ClassTag: TypeInformation](
+    data: Seq[T]): DataStream[T] = {
+    Validate.notNull(data, "Data must not be null.")
+    val typeInfo = implicitly[TypeInformation[T]]
+    val returnStream = new DataStreamSource[T](javaEnv,
+      "elements", typeInfo);
+
+    javaEnv.getJobGraphBuilder.addStreamVertex(returnStream.getId(),
+      new SourceInvokable[T](new 
FromElementsFunction[T](scala.collection.JavaConversions.asJavaCollection(data))),
 null, typeInfo,
+      "source", 1);
+    new DataStream(returnStream)
+  }
+
+  def execute() = javaEnv.execute()
+
+}
+
+object StreamExecutionEnvironment {
+
+  /**
+   * Creates an execution environment that represents the context in which the 
program is
+   * currently executed. If the program is invoked standalone, this method 
returns a local
+   * execution environment. If the program is invoked from within the command 
line client
+   * to be submitted to a cluster, this method returns the execution 
environment of this cluster.
+   */
+  def getExecutionEnvironment: StreamExecutionEnvironment = {
+    new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment)
+  }
+
+  /**
+   * Creates a local execution environment. The local execution environment 
will run the program in
+   * a multi-threaded fashion in the same JVM as the environment was created 
in. The default degree
+   * of parallelism of the local environment is the number of hardware 
contexts (CPU cores/threads).
+   */
+  def createLocalEnvironment(
+    degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()): 
StreamExecutionEnvironment = {
+    new 
StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism))
+  }
+
+  /**
+   * Creates a remote execution environment. The remote environment sends 
(parts of) the program to
+   * a cluster for execution. Note that all file paths used in the program 
must be accessible from
+   * the cluster. The execution will use the cluster's default degree of 
parallelism, unless the
+   * parallelism is set explicitly via 
[[ExecutionEnvironment.setDegreeOfParallelism()]].
+   *
+   * @param host The host name or address of the master (JobManager),
+   *             where the program should be executed.
+   * @param port The port of the master (JobManager), where the program should 
be executed.
+   * @param jarFiles The JAR files with code that needs to be shipped to the 
cluster. If the
+   *                 program uses
+   *                 user-defined functions, user-defined input formats, or 
any libraries,
+   *                 those must be
+   *                 provided in the JAR files.
+   */
+  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): 
StreamExecutionEnvironment = {
+    new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, 
jarFiles: _*))
+  }
+
+  /**
+   * Creates a remote execution environment. The remote environment sends 
(parts of) the program
+   * to a cluster for execution. Note that all file paths used in the program 
must be accessible
+   * from the cluster. The execution will use the specified degree of 
parallelism.
+   *
+   * @param host The host name or address of the master (JobManager),
+   *             where the program should be executed.
+   * @param port The port of the master (JobManager), where the program should 
be executed.
+   * @param degreeOfParallelism The degree of parallelism to use during the 
execution.
+   * @param jarFiles The JAR files with code that needs to be shipped to the 
cluster. If the
+   *                 program uses
+   *                 user-defined functions, user-defined input formats, or 
any libraries,
+   *                 those must be
+   *                 provided in the JAR files.
+   */
+  def createRemoteEnvironment(
+    host: String,
+    port: Int,
+    degreeOfParallelism: Int,
+    jarFiles: String*): StreamExecutionEnvironment = {
+    val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
+    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+    new StreamExecutionEnvironment(javaEnv)
+  }
+}
\ No newline at end of file

Reply via email to