[scala] [streaming] Base functionality added for streaming scala api
Conflicts:
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b1fd156
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b1fd156
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b1fd156
Branch: refs/heads/release-0.8
Commit: 6b1fd156a774a8292dd2e9227d611dcca5b9c526
Parents: b2271bd
Author: Gyula Fora <[email protected]>
Authored: Thu Dec 11 15:22:03 2014 +0100
Committer: mbalassi <[email protected]>
Committed: Mon Jan 5 17:49:00 2015 +0100
----------------------------------------------------------------------
.../flink/streaming/api/JobGraphBuilder.java | 37 ++---
.../flink/streaming/api/StreamConfig.java | 16 --
.../api/datastream/ConnectedDataStream.java | 47 +++---
.../streaming/api/datastream/DataStream.java | 70 ++++----
.../api/datastream/GroupedDataStream.java | 10 +-
.../api/datastream/StreamProjection.java | 161 ++++++++-----------
.../api/datastream/WindowedDataStream.java | 14 +-
.../environment/StreamExecutionEnvironment.java | 38 ++---
.../function/source/FromElementsFunction.java | 90 ++++++-----
.../api/streamvertex/StreamVertex.java | 3 +-
flink-scala/pom.xml | 6 +
.../flink/api/scala/streaming/DataStream.scala | 76 +++++++++
.../streaming/StreamExecutionEnvironment.scala | 150 +++++++++++++++++
13 files changed, 425 insertions(+), 293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6b1fd156/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index d66e388..f358de9 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -72,7 +72,6 @@ public class JobGraphBuilder {
private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
- private Map<String, byte[]> serializedFunctions;
private Map<String, byte[]> outputSelectors;
private Map<String, Class<? extends AbstractInvokable>> vertexClasses;
private Map<String, Integer> iterationIds;
@@ -104,7 +103,6 @@ public class JobGraphBuilder {
typeSerializersIn2 = new HashMap<String,
StreamRecordSerializer<?>>();
typeSerializersOut1 = new HashMap<String,
StreamRecordSerializer<?>>();
typeSerializersOut2 = new HashMap<String,
StreamRecordSerializer<?>>();
- serializedFunctions = new HashMap<String, byte[]>();
outputSelectors = new HashMap<String, byte[]>();
vertexClasses = new HashMap<String, Class<? extends
AbstractInvokable>>();
iterationIds = new HashMap<String, Integer>();
@@ -133,18 +131,14 @@ public class JobGraphBuilder {
* Output type for serialization
* @param operatorName
* Operator type
- * @param serializedFunction
- * Serialized udf
* @param parallelism
* Number of parallel instances created
*/
public <IN, OUT> void addStreamVertex(String vertexName,
StreamInvokable<IN, OUT> invokableObject,
TypeInformation<IN> inTypeInfo,
- TypeInformation<OUT> outTypeInfo, String operatorName,
byte[] serializedFunction,
- int parallelism) {
+ TypeInformation<OUT> outTypeInfo, String operatorName,
int parallelism) {
- addVertex(vertexName, StreamVertex.class, invokableObject,
operatorName,
- serializedFunction, parallelism);
+ addVertex(vertexName, StreamVertex.class, invokableObject,
operatorName, parallelism);
StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ?
new StreamRecordSerializer<IN>(
inTypeInfo) : null;
@@ -171,8 +165,6 @@ public class JobGraphBuilder {
* Output type for serialization
* @param operatorName
* Operator type
- * @param serializedFunction
- * Serialized udf
* @param parallelism
* Number of parallel instances created
*/
@@ -185,7 +177,7 @@ public class JobGraphBuilder {
function);
addStreamVertex(vertexName, invokableObject, inTypeInfo,
outTypeInfo, operatorName,
- serializedFunction, parallelism);
+ parallelism);
}
/**
@@ -206,7 +198,7 @@ public class JobGraphBuilder {
public void addIterationHead(String vertexName, String iterationHead,
Integer iterationID,
int parallelism, long waitTime) {
- addVertex(vertexName, StreamIterationHead.class, null, null,
null, parallelism);
+ addVertex(vertexName, StreamIterationHead.class, null, null,
parallelism);
iterationIds.put(vertexName, iterationID);
iterationIDtoHeadName.put(iterationID, vertexName);
@@ -247,7 +239,7 @@ public class JobGraphBuilder {
throw new RuntimeException("Buffer timeout 0 at
iteration tail is not supported.");
}
- addVertex(vertexName, StreamIterationTail.class, null, null,
null, parallelism);
+ addVertex(vertexName, StreamIterationTail.class, null, null,
parallelism);
iterationIds.put(vertexName, iterationID);
iterationIDtoTailName.put(iterationID, vertexName);
@@ -264,10 +256,9 @@ public class JobGraphBuilder {
public <IN1, IN2, OUT> void addCoTask(String vertexName,
CoInvokable<IN1, IN2, OUT> taskInvokableObject,
TypeInformation<IN1> in1TypeInfo,
TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT>
outTypeInfo,
- String operatorName, byte[] serializedFunction, int
parallelism) {
+ String operatorName, int parallelism) {
- addVertex(vertexName, CoStreamVertex.class,
taskInvokableObject, operatorName,
- serializedFunction, parallelism);
+ addVertex(vertexName, CoStreamVertex.class,
taskInvokableObject, operatorName, parallelism);
addTypeSerializers(vertexName, new
StreamRecordSerializer<IN1>(in1TypeInfo),
new StreamRecordSerializer<IN2>(in2TypeInfo),
new StreamRecordSerializer<OUT>(
@@ -289,20 +280,16 @@ public class JobGraphBuilder {
* The user defined invokable object
* @param operatorName
* Type of the user defined operator
- * @param serializedFunction
- * Serialized operator
* @param parallelism
* Number of parallel instances created
*/
private void addVertex(String vertexName, Class<? extends
AbstractInvokable> vertexClass,
- StreamInvokable<?, ?> invokableObject, String
operatorName, byte[] serializedFunction,
- int parallelism) {
+ StreamInvokable<?, ?> invokableObject, String
operatorName, int parallelism) {
vertexClasses.put(vertexName, vertexClass);
setParallelism(vertexName, parallelism);
invokableObjects.put(vertexName, invokableObject);
operatorNames.put(vertexName, operatorName);
- serializedFunctions.put(vertexName, serializedFunction);
outEdgeList.put(vertexName, new ArrayList<String>());
outEdgeType.put(vertexName, new ArrayList<Integer>());
outEdgeNames.put(vertexName, new ArrayList<List<String>>());
@@ -333,8 +320,6 @@ public class JobGraphBuilder {
// Get vertex attributes
Class<? extends AbstractInvokable> vertexClass =
vertexClasses.get(vertexName);
StreamInvokable<?, ?> invokableObject =
invokableObjects.get(vertexName);
- String operatorName = operatorNames.get(vertexName);
- byte[] serializedFunction = serializedFunctions.get(vertexName);
int parallelism = vertexParallelism.get(vertexName);
byte[] outputSelector = outputSelectors.get(vertexName);
Map<String, OperatorState<?>> state =
operatorStates.get(vertexName);
@@ -362,7 +347,6 @@ public class JobGraphBuilder {
// Set vertex config
config.setUserInvokable(invokableObject);
config.setVertexName(vertexName);
- config.setFunction(serializedFunction, operatorName);
config.setOutputSelector(outputSelector);
config.setOperatorStates(state);
@@ -522,8 +506,8 @@ public class JobGraphBuilder {
}
/**
- * Sets udf operator and TypeSerializerWrapper from one vertex to
another,
- * used with some sinks.
+ * Sets TypeSerializerWrapper from one vertex to another, used with some
+ * sinks.
*
* @param from
* from
@@ -532,7 +516,6 @@ public class JobGraphBuilder {
*/
public void setBytesFrom(String from, String to) {
operatorNames.put(to, operatorNames.get(from));
- serializedFunctions.put(to, serializedFunctions.get(from));
typeSerializersIn1.put(to, typeSerializersOut1.get(from));
typeSerializersIn2.put(to, typeSerializersOut2.get(from));
http://git-wip-us.apache.org/repos/asf/flink/blob/6b1fd156/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 9800b63..8837b85 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -47,7 +47,6 @@ public class StreamConfig {
private static final String OUTPUT_SELECTOR = "outputSelector";
private static final String DIRECTED_EMIT = "directedEmit";
private static final String FUNCTION_NAME = "operatorName";
- private static final String FUNCTION = "operator";
private static final String VERTEX_NAME = "vertexName";
private static final String SERIALIZEDUDF = "serializedudf";
private static final String USER_FUNCTION = "userfunction";
@@ -173,21 +172,6 @@ public class StreamConfig {
return config.getString(VERTEX_NAME, null);
}
- public void setFunction(byte[] serializedFunction, String functionName)
{
- if (serializedFunction != null) {
- config.setBytes(FUNCTION, serializedFunction);
- config.setString(FUNCTION_NAME, functionName);
- }
- }
-
- public Object getFunction(ClassLoader cl) {
- try {
- return
InstantiationUtil.readObjectFromConfig(this.config, FUNCTION, cl);
- } catch (Exception e) {
- throw new RuntimeException("Cannot deserialize
invokable object", e);
- }
- }
-
public String getFunctionName() {
return config.getString(FUNCTION_NAME, "");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b1fd156/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index dcc3dab..c663315 100755
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -17,13 +17,9 @@
package org.apache.flink.streaming.api.datastream;
-import java.io.Serializable;
import java.util.List;
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
@@ -53,7 +49,7 @@ import org.apache.flink.util.Collector;
* The ConnectedDataStream represents a stream for two different data types. It
* can be used to apply transformations like {@link CoMapFunction} on two
* {@link DataStream}s
- *
+ *
* @param <IN1>
* Type of the first input data steam.
* @param <IN2>
@@ -417,8 +413,9 @@ public class ConnectedDataStream<IN1, IN2> {
TypeInformation<OUT> outTypeInfo =
TypeExtractor.createTypeInfo(CoMapFunction.class,
coMapper.getClass(), 2, null, null);
- return addCoFunction("coMap", clean(coMapper), outTypeInfo,
- new CoMapInvokable<IN1, IN2,
OUT>(clean(coMapper)));
+ return addCoFunction("coMap", outTypeInfo, new
CoMapInvokable<IN1, IN2, OUT>(
+ clean(coMapper)));
+
}
/**
@@ -441,8 +438,8 @@ public class ConnectedDataStream<IN1, IN2> {
TypeInformation<OUT> outTypeInfo =
TypeExtractor.createTypeInfo(CoFlatMapFunction.class,
coFlatMapper.getClass(), 2, null, null);
- return addCoFunction("coFlatMap", clean(coFlatMapper),
outTypeInfo,
- new CoFlatMapInvokable<IN1, IN2,
OUT>(clean(coFlatMapper)));
+ return addCoFunction("coFlatMap", outTypeInfo, new
CoFlatMapInvokable<IN1, IN2, OUT>(
+ clean(coFlatMapper)));
}
/**
@@ -466,8 +463,8 @@ public class ConnectedDataStream<IN1, IN2> {
TypeInformation<OUT> outTypeInfo =
TypeExtractor.createTypeInfo(CoReduceFunction.class,
coReducer.getClass(), 2, null, null);
- return addCoFunction("coReduce", clean(coReducer), outTypeInfo,
- getReduceInvokable(clean(coReducer)));
+ return addCoFunction("coReduce", outTypeInfo,
getReduceInvokable(clean(coReducer)));
+
}
/**
@@ -531,9 +528,9 @@ public class ConnectedDataStream<IN1, IN2> {
TypeInformation<OUT> outTypeInfo =
TypeExtractor.createTypeInfo(CoWindowFunction.class,
coWindowFunction.getClass(), 2, null, null);
- return addCoFunction("coWindowReduce", clean(coWindowFunction),
outTypeInfo,
- new CoWindowInvokable<IN1, IN2,
OUT>(clean(coWindowFunction), windowSize, slideInterval,
- timestamp1, timestamp2));
+ return addCoFunction("coWindowReduce", outTypeInfo, new
CoWindowInvokable<IN1, IN2, OUT>(
+ clean(coWindowFunction), windowSize,
slideInterval, timestamp1, timestamp2));
+
}
protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
@@ -607,27 +604,21 @@ public class ConnectedDataStream<IN1, IN2> {
throw new IllegalArgumentException("Slide interval must
be positive");
}
- return addCoFunction("coWindowReduce", clean(coWindowFunction),
outTypeInfo,
- new CoWindowInvokable<IN1, IN2,
OUT>(clean(coWindowFunction), windowSize, slideInterval,
- timestamp1, timestamp2));
+ return addCoFunction("coWindowReduce", outTypeInfo, new
CoWindowInvokable<IN1, IN2, OUT>(
+ clean(coWindowFunction), windowSize,
slideInterval, timestamp1, timestamp2));
+
}
- protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String
functionName,
- final Function function, TypeInformation<OUT>
outTypeInfo,
- CoInvokable<IN1, IN2, OUT> functionInvokable) {
+ public <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String
functionName,
+ TypeInformation<OUT> outTypeInfo, CoInvokable<IN1, IN2,
OUT> functionInvokable) {
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<OUT, ?> returnStream = new
SingleOutputStreamOperator(
environment, functionName, outTypeInfo);
- try {
-
dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
- getInputType1(), getInputType2(),
outTypeInfo, functionName,
-
SerializationUtils.serialize((Serializable) function),
- environment.getDegreeOfParallelism());
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize user
defined function");
- }
+ dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(),
functionInvokable,
+ getInputType1(), getInputType2(), outTypeInfo,
functionName,
+ environment.getDegreeOfParallelism());
dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);
http://git-wip-us.apache.org/repos/asf/flink/blob/6b1fd156/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 6e8da0a..04929c1 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -17,15 +17,11 @@
package org.apache.flink.streaming.api.datastream;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
@@ -401,8 +397,7 @@ public class DataStream<OUT> {
TypeInformation<R> outType =
TypeExtractor.getMapReturnTypes(clean(mapper), getType());
- return addFunction("map", clean(mapper), getType(), outType,
new MapInvokable<OUT, R>(
- clean(mapper)));
+ return transform("map", outType, new MapInvokable<OUT,
R>(clean(mapper)));
}
/**
@@ -423,10 +418,11 @@ public class DataStream<OUT> {
*/
public <R> SingleOutputStreamOperator<R, ?>
flatMap(FlatMapFunction<OUT, R> flatMapper) {
- TypeInformation<R> outType =
TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType());
+ TypeInformation<R> outType =
TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
+ getType());
+
+ return transform("flatMap", outType, new FlatMapInvokable<OUT,
R>(clean(flatMapper)));
- return addFunction("flatMap", clean(flatMapper), getType(),
outType,
- new FlatMapInvokable<OUT,
R>(clean(flatMapper)));
}
/**
@@ -442,8 +438,8 @@ public class DataStream<OUT> {
*/
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT>
reducer) {
- return addFunction("reduce", clean(reducer), getType(),
getType(),
- new StreamReduceInvokable<OUT>(clean(reducer)));
+ return transform("reduce", getType(), new
StreamReduceInvokable<OUT>(clean(reducer)));
+
}
/**
@@ -461,8 +457,8 @@ public class DataStream<OUT> {
* @return The filtered DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT>
filter) {
- return addFunction("filter", clean(filter), getType(),
getType(), new FilterInvokable<OUT>(clean(
- filter)));
+ return transform("filter", getType(), new
FilterInvokable<OUT>(clean(filter)));
+
}
/**
@@ -742,7 +738,7 @@ public class DataStream<OUT> {
public SingleOutputStreamOperator<Long, ?> count() {
TypeInformation<Long> outTypeInfo =
TypeExtractor.getForObject(Long.valueOf(0));
- return addFunction("counter", null, getType(), outTypeInfo, new
CounterInvokable<OUT>());
+ return transform("counter", outTypeInfo, new
CounterInvokable<OUT>());
}
/**
@@ -1120,8 +1116,7 @@ public class DataStream<OUT> {
StreamReduceInvokable<OUT> invokable = new
StreamReduceInvokable<OUT>(aggregate);
- SingleOutputStreamOperator<OUT, ?> returnStream =
addFunction("reduce", clean(aggregate),
- typeInfo, typeInfo, invokable);
+ SingleOutputStreamOperator<OUT, ?> returnStream =
transform("reduce", typeInfo, invokable);
return returnStream;
}
@@ -1137,34 +1132,28 @@ public class DataStream<OUT> {
}
/**
- * Internal function for passing the user defined functions to the
JobGraph
- * of the job.
- *
- * @param functionName
- * name of the function
- * @param function
- * the user defined function
- * @param functionInvokable
- * the wrapping JobVertex instance
+ * Method for passing user defined invokables along with the type
+ * informations that will transform the DataStream.
+ *
+ * @param operatorName
+ * name of the operator, for logging purposes
+ * @param outTypeInfo
+ * the output type of the operator
+ * @param invokable
+ * the object containing the transformation logic
* @param <R>
* type of the return stream
* @return the data stream constructed
*/
- protected <R> SingleOutputStreamOperator<R, ?> addFunction(String
functionName,
- final Function function, TypeInformation<OUT>
inTypeInfo,
- TypeInformation<R> outTypeInfo, StreamInvokable<OUT, R>
functionInvokable) {
+ public <R> SingleOutputStreamOperator<R, ?> transform(String
operatorName,
+ TypeInformation<R> outTypeInfo, StreamInvokable<OUT, R>
invokable) {
DataStream<OUT> inputStream = this.copy();
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R, ?> returnStream = new
SingleOutputStreamOperator(environment,
- functionName, outTypeInfo);
+ operatorName, outTypeInfo);
- try {
- jobGraphBuilder.addStreamVertex(returnStream.getId(),
functionInvokable, inTypeInfo,
- outTypeInfo, functionName,
-
SerializationUtils.serialize((Serializable) function), degreeOfParallelism);
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize user
defined function");
- }
+ jobGraphBuilder.addStreamVertex(returnStream.getId(),
invokable, getType(), outTypeInfo,
+ operatorName, degreeOfParallelism);
connectGraph(inputStream, returnStream.getId(), 0);
@@ -1235,13 +1224,8 @@ public class DataStream<OUT> {
SinkFunction<OUT> sinkFunction, TypeInformation<OUT>
inTypeInfo) {
DataStreamSink<OUT> returnStream = new
DataStreamSink<OUT>(environment, "sink", typeInfo);
- try {
- jobGraphBuilder.addStreamVertex(returnStream.getId(),
new SinkInvokable<OUT>(
- clean(sinkFunction)), inTypeInfo, null,
"sink", SerializationUtils
- .serialize(clean(sinkFunction)),
degreeOfParallelism);
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize
SinkFunction");
- }
+ jobGraphBuilder.addStreamVertex(returnStream.getId(), new
SinkInvokable<OUT>(
+ clean(sinkFunction)), inTypeInfo, null, "sink",
degreeOfParallelism);
inputStream.connectGraph(inputStream.copy(),
returnStream.getId(), 0);
http://git-wip-us.apache.org/repos/asf/flink/blob/6b1fd156/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 2620d3e..a2c0f89 100755
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -29,7 +29,7 @@ import
org.apache.flink.streaming.partitioner.StreamPartitioner;
* partitioned by the given {@link KeySelector}. Operators like {@link
#reduce},
* {@link #batchReduce} etc. can be applied on the {@link GroupedDataStream} to
* get additional functionality by the grouping.
- *
+ *
* @param <OUT>
* The output type of the {@link GroupedDataStream}.
*/
@@ -62,8 +62,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
*/
@Override
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT>
reducer) {
- return addFunction("groupReduce", clean(reducer), getType(),
getType(),
- new GroupedReduceInvokable<OUT>(clean(reducer),
keySelector));
+ return transform("groupReduce", getType(), new
GroupedReduceInvokable<OUT>(clean(reducer),
+ keySelector));
}
/**
@@ -182,8 +182,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT>
{
GroupedReduceInvokable<OUT> invokable = new
GroupedReduceInvokable<OUT>(clean(aggregate),
keySelector);
- SingleOutputStreamOperator<OUT, ?> returnStream =
addFunction("groupReduce", clean(aggregate),
- typeInfo, typeInfo, invokable);
+ SingleOutputStreamOperator<OUT, ?> returnStream =
transform("groupReduce", typeInfo,
+ invokable);
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b1fd156/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index cc5f66e..e71b18c 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -83,8 +83,8 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple1<T0>> outType =
(TypeInformation<Tuple1<T0>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
- return dataStream.addFunction("projection", null, inTypeInfo,
outType,
- new ProjectInvokable<IN,
Tuple1<T0>>(fieldIndexes, outType));
+ return dataStream.transform("projection", outType, new
ProjectInvokable<IN, Tuple1<T0>>(
+ fieldIndexes, outType));
}
/**
@@ -111,7 +111,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple2<T0, T1>> outType =
(TypeInformation<Tuple2<T0, T1>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
- return dataStream.addFunction("projection", null, inTypeInfo,
outType,
+ return dataStream.transform("projection", outType,
new ProjectInvokable<IN, Tuple2<T0,
T1>>(fieldIndexes, outType));
}
@@ -141,7 +141,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple3<T0, T1, T2>> outType =
(TypeInformation<Tuple3<T0, T1, T2>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
- return dataStream.addFunction("projection", null, inTypeInfo,
outType,
+ return dataStream.transform("projection", outType,
new ProjectInvokable<IN, Tuple3<T0, T1,
T2>>(fieldIndexes, outType));
}
@@ -173,7 +173,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple4<T0, T1, T2, T3>> outType =
(TypeInformation<Tuple4<T0, T1, T2, T3>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
- return dataStream.addFunction("projection", null, inTypeInfo,
outType,
+ return dataStream.transform("projection", outType,
new ProjectInvokable<IN, Tuple4<T0, T1, T2,
T3>>(fieldIndexes, outType));
}
@@ -206,14 +206,14 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple5<T0, T1, T2, T3, T4>> outType =
(TypeInformation<Tuple5<T0, T1, T2, T3, T4>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
- return dataStream.addFunction("projection", null, inTypeInfo,
outType,
+ return dataStream.transform("projection", outType,
new ProjectInvokable<IN, Tuple5<T0, T1, T2, T3,
T4>>(fieldIndexes, outType));
}
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -227,7 +227,7 @@ public class StreamProjection<IN> {
* @param type5
* The class of field '5' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -243,14 +243,14 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>> outType =
(TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
- return dataStream.addFunction("projection", null, inTypeInfo,
outType,
+ return dataStream.transform("projection", outType,
new ProjectInvokable<IN, Tuple6<T0, T1, T2, T3,
T4, T5>>(fieldIndexes, outType));
}
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -266,7 +266,7 @@ public class StreamProjection<IN> {
* @param type6
* The class of field '6' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -283,7 +283,7 @@ public class StreamProjection<IN> {
TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>> outType =
(TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
- .addFunction("projection", null, inTypeInfo,
outType,
+ .transform("projection", outType,
new ProjectInvokable<IN,
Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes,
outType));
}
@@ -291,7 +291,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -309,7 +309,7 @@ public class StreamProjection<IN> {
* @param type7
* The class of field '7' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -325,7 +325,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> outType
= (TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
- return dataStream.addFunction("projection", null, inTypeInfo,
outType,
+ return dataStream.transform("projection", outType,
new ProjectInvokable<IN, Tuple8<T0, T1, T2, T3,
T4, T5, T6, T7>>(fieldIndexes,
outType));
}
@@ -333,7 +333,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -353,7 +353,7 @@ public class StreamProjection<IN> {
* @param type8
* The class of field '8' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -369,7 +369,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>
outType = (TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>)
extractFieldTypes(
fieldIndexes, types, inTypeInfo);
- return dataStream.addFunction("projection", null, inTypeInfo,
outType,
+ return dataStream.transform("projection", outType,
new ProjectInvokable<IN, Tuple9<T0, T1, T2, T3,
T4, T5, T6, T7, T8>>(fieldIndexes,
outType));
}
@@ -377,7 +377,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -399,7 +399,7 @@ public class StreamProjection<IN> {
* @param type9
* The class of field '9' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -415,7 +415,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8,
T9>> outType = (TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8,
T9>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
- return dataStream.addFunction("projection", null, inTypeInfo,
outType,
+ return dataStream.transform("projection", outType,
new ProjectInvokable<IN, Tuple10<T0, T1, T2,
T3, T4, T5, T6, T7, T8, T9>>(
fieldIndexes, outType));
}
@@ -423,7 +423,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -447,7 +447,7 @@ public class StreamProjection<IN> {
* @param type10
* The class of field '10' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -465,7 +465,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10>> outType = (TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8,
T9, T10>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
- return dataStream.addFunction("projection", null, inTypeInfo,
outType,
+ return dataStream.transform("projection", outType,
new ProjectInvokable<IN, Tuple11<T0, T1, T2,
T3, T4, T5, T6, T7, T8, T9, T10>>(
fieldIndexes, outType));
}
@@ -473,7 +473,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -499,7 +499,7 @@ public class StreamProjection<IN> {
* @param type11
* The class of field '11' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -518,10 +518,8 @@ public class StreamProjection<IN> {
TypeInformation<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10, T11>> outType = (TypeInformation<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7,
T8, T9, T10, T11>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
- .addFunction(
+ .transform(
"projection",
- null,
- inTypeInfo,
outType,
new ProjectInvokable<IN,
Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(
fieldIndexes,
outType));
@@ -530,7 +528,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -558,7 +556,7 @@ public class StreamProjection<IN> {
* @param type12
* The class of field '12' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -577,10 +575,8 @@ public class StreamProjection<IN> {
TypeInformation<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10, T11, T12>> outType = (TypeInformation<Tuple13<T0, T1, T2, T3, T4, T5, T6,
T7, T8, T9, T10, T11, T12>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
- .addFunction(
+ .transform(
"projection",
- null,
- inTypeInfo,
outType,
new ProjectInvokable<IN,
Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(
fieldIndexes,
outType));
@@ -589,7 +585,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -619,7 +615,7 @@ public class StreamProjection<IN> {
* @param type13
* The class of field '13' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -638,10 +634,8 @@ public class StreamProjection<IN> {
TypeInformation<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10, T11, T12, T13>> outType = (TypeInformation<Tuple14<T0, T1, T2, T3, T4, T5,
T6, T7, T8, T9, T10, T11, T12, T13>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
- .addFunction(
+ .transform(
"projection",
- null,
- inTypeInfo,
outType,
new ProjectInvokable<IN,
Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(
fieldIndexes,
outType));
@@ -650,7 +644,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -682,7 +676,7 @@ public class StreamProjection<IN> {
* @param type14
* The class of field '14' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -702,10 +696,8 @@ public class StreamProjection<IN> {
TypeInformation<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10, T11, T12, T13, T14>> outType = (TypeInformation<Tuple15<T0, T1, T2, T3,
T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
- .addFunction(
+ .transform(
"projection",
- null,
- inTypeInfo,
outType,
new ProjectInvokable<IN,
Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(
fieldIndexes,
outType));
@@ -714,7 +706,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -748,7 +740,7 @@ public class StreamProjection<IN> {
* @param type15
* The class of field '15' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -768,10 +760,8 @@ public class StreamProjection<IN> {
TypeInformation<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10, T11, T12, T13, T14, T15>> outType = (TypeInformation<Tuple16<T0, T1, T2,
T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
- .addFunction(
+ .transform(
"projection",
- null,
- inTypeInfo,
outType,
new ProjectInvokable<IN,
Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(
fieldIndexes,
outType));
@@ -780,7 +770,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -816,7 +806,7 @@ public class StreamProjection<IN> {
* @param type16
* The class of field '16' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -836,10 +826,8 @@ public class StreamProjection<IN> {
TypeInformation<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10, T11, T12, T13, T14, T15, T16>> outType = (TypeInformation<Tuple17<T0, T1,
T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>)
extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
- .addFunction(
+ .transform(
"projection",
- null,
- inTypeInfo,
outType,
new ProjectInvokable<IN,
Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15,
T16>>(
fieldIndexes,
outType));
@@ -848,7 +836,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -886,7 +874,7 @@ public class StreamProjection<IN> {
* @param type17
* The class of field '17' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -906,10 +894,8 @@ public class StreamProjection<IN> {
TypeInformation<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10, T11, T12, T13, T14, T15, T16, T17>> outType = (TypeInformation<Tuple18<T0,
T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>)
extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
- .addFunction(
+ .transform(
"projection",
- null,
- inTypeInfo,
outType,
new ProjectInvokable<IN,
Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15,
T16, T17>>(
fieldIndexes,
outType));
@@ -918,7 +904,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -958,7 +944,7 @@ public class StreamProjection<IN> {
* @param type18
* The class of field '18' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -979,10 +965,8 @@ public class StreamProjection<IN> {
TypeInformation<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10, T11, T12, T13, T14, T15, T16, T17, T18>> outType =
(TypeInformation<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12,
T13, T14, T15, T16, T17, T18>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
- .addFunction(
+ .transform(
"projection",
- null,
- inTypeInfo,
outType,
new ProjectInvokable<IN,
Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15,
T16, T17, T18>>(
fieldIndexes,
outType));
@@ -991,7 +975,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -1033,7 +1017,7 @@ public class StreamProjection<IN> {
* @param type19
* The class of field '19' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -1054,10 +1038,8 @@ public class StreamProjection<IN> {
TypeInformation<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> outType =
(TypeInformation<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12,
T13, T14, T15, T16, T17, T18, T19>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
- .addFunction(
+ .transform(
"projection",
- null,
- inTypeInfo,
outType,
new ProjectInvokable<IN,
Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15,
T16, T17, T18, T19>>(
fieldIndexes,
outType));
@@ -1066,7 +1048,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -1110,7 +1092,7 @@ public class StreamProjection<IN> {
* @param type20
* The class of field '20' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -1132,10 +1114,8 @@ public class StreamProjection<IN> {
TypeInformation<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> outType =
(TypeInformation<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12,
T13, T14, T15, T16, T17, T18, T19, T20>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
- .addFunction(
+ .transform(
"projection",
- null,
- inTypeInfo,
outType,
new ProjectInvokable<IN,
Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15,
T16, T17, T18, T19, T20>>(
fieldIndexes,
outType));
@@ -1144,7 +1124,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -1190,7 +1170,7 @@ public class StreamProjection<IN> {
* @param type21
* The class of field '21' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -1212,10 +1192,8 @@ public class StreamProjection<IN> {
TypeInformation<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> outType =
(TypeInformation<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12,
T13, T14, T15, T16, T17, T18, T19, T20, T21>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
- .addFunction(
+ .transform(
"projection",
- null,
- inTypeInfo,
outType,
new ProjectInvokable<IN,
Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15,
T16, T17, T18, T19, T20, T21>>(
fieldIndexes,
outType));
@@ -1224,7 +1202,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -1272,7 +1250,7 @@ public class StreamProjection<IN> {
* @param type22
* The class of field '22' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -1295,10 +1273,8 @@ public class StreamProjection<IN> {
TypeInformation<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> outType =
(TypeInformation<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12,
T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
- .addFunction(
+ .transform(
"projection",
- null,
- inTypeInfo,
outType,
new ProjectInvokable<IN,
Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15,
T16, T17, T18, T19, T20, T21, T22>>(
fieldIndexes,
outType));
@@ -1307,7 +1283,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -1357,7 +1333,7 @@ public class StreamProjection<IN> {
* @param type23
* The class of field '23' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -1380,10 +1356,8 @@ public class StreamProjection<IN> {
TypeInformation<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> outType
= (TypeInformation<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11,
T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
- .addFunction(
+ .transform(
"projection",
- null,
- inTypeInfo,
outType,
new ProjectInvokable<IN,
Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15,
T16, T17, T18, T19, T20, T21, T22, T23>>(
fieldIndexes,
outType));
@@ -1392,7 +1366,7 @@ public class StreamProjection<IN> {
/**
* Projects a {@link Tuple} {@link DataStream} to the previously
selected
* fields. Requires the classes of the fields of the resulting Tuples.
- *
+ *
* @param type0
* The class of field '0' of the result Tuples.
* @param type1
@@ -1444,7 +1418,7 @@ public class StreamProjection<IN> {
* @param type24
* The class of field '24' of the result Tuples.
* @return The projected DataStream.
- *
+ *
* @see Tuple
* @see DataStream
*/
@@ -1467,10 +1441,9 @@ public class StreamProjection<IN> {
TypeInformation<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9,
T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>
outType = (TypeInformation<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10,
T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>)
extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
- .addFunction(
+ .transform(
"projection",
- null,
- inTypeInfo,
+
outType,
new ProjectInvokable<IN,
Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15,
T16, T17, T18, T19, T20, T21, T22, T23, T24>>(
fieldIndexes,
outType));
http://git-wip-us.apache.org/repos/asf/flink/blob/6b1fd156/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 788f28d..cb9cd04 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -50,7 +50,7 @@ import
org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
* into windows (predefined chunks). User defined function such as
* {@link #reduce(ReduceFunction)}, {@link #reduceGroup(GroupReduceFunction)}
or
* aggregations can be applied to the windows.
- *
+ *
* @param <OUT>
* The output type of the {@link WindowedDataStream}
*/
@@ -124,8 +124,8 @@ public class WindowedDataStream<OUT> {
this.userEvicters = windowedDataStream.userEvicters;
this.allCentral = windowedDataStream.allCentral;
}
-
- public <F> F clean(F f){
+
+ public <F> F clean(F f) {
return dataStream.clean(f);
}
@@ -231,7 +231,7 @@ public class WindowedDataStream<OUT> {
* @return The transformed DataStream
*/
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT>
reduceFunction) {
- return dataStream.addFunction("NextGenWindowReduce",
clean(reduceFunction), getType(), getType(),
+ return dataStream.transform("NextGenWindowReduce", getType(),
getReduceInvokable(reduceFunction));
}
@@ -255,7 +255,7 @@ public class WindowedDataStream<OUT> {
TypeInformation<R> outType = TypeExtractor
.getGroupReduceReturnTypes(reduceFunction,
inType);
- return dataStream.addFunction("NextGenWindowReduce",
clean(reduceFunction), inType, outType,
+ return dataStream.transform("NextGenWindowReduce", outType,
getReduceGroupInvokable(reduceFunction));
}
@@ -457,8 +457,8 @@ public class WindowedDataStream<OUT> {
private SingleOutputStreamOperator<OUT, ?>
aggregate(AggregationFunction<OUT> aggregator) {
StreamInvokable<OUT, OUT> invokable =
getReduceInvokable(aggregator);
- SingleOutputStreamOperator<OUT, ?> returnStream =
dataStream.addFunction("windowReduce",
- clean(aggregator), getType(), getType(),
invokable);
+ SingleOutputStreamOperator<OUT, ?> returnStream =
dataStream.transform("windowReduce",
+ getType(), invokable);
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b1fd156/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 59d56aa..e9581a1 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -22,8 +22,6 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.List;
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
@@ -264,14 +262,10 @@ public abstract class StreamExecutionEnvironment {
DataStreamSource<OUT> returnStream = new
DataStreamSource<OUT>(this, "elements",
outTypeInfo);
- try {
- SourceFunction<OUT> function = new
FromElementsFunction<OUT>(data);
- jobGraphBuilder.addStreamVertex(returnStream.getId(),
- new SourceInvokable<OUT>(function),
null, outTypeInfo, "source",
- SerializationUtils.serialize(function),
1);
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize elements");
- }
+ SourceFunction<OUT> function = new
FromElementsFunction<OUT>(data);
+ jobGraphBuilder.addStreamVertex(returnStream.getId(), new
SourceInvokable<OUT>(function),
+ null, outTypeInfo, "source", 1);
+
return returnStream;
}
@@ -300,15 +294,8 @@ public abstract class StreamExecutionEnvironment {
DataStreamSource<OUT> returnStream = new
DataStreamSource<OUT>(this, "collection",
outTypeInfo);
- try {
- SourceFunction<OUT> function = new
FromElementsFunction<OUT>(data);
-
- jobGraphBuilder.addStreamVertex(returnStream.getId(),
new SourceInvokable<OUT>(
- new FromElementsFunction<OUT>(data)),
null, outTypeInfo, "source",
- SerializationUtils.serialize(function),
1);
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize
collection");
- }
+ jobGraphBuilder.addStreamVertex(returnStream.getId(), new
SourceInvokable<OUT>(
+ new FromElementsFunction<OUT>(data)), null,
outTypeInfo, "source", 1);
return returnStream;
}
@@ -317,7 +304,7 @@ public abstract class StreamExecutionEnvironment {
* Creates a new DataStream that contains the strings received
infinitely
* from socket. Received strings are decoded by the system's default
* character set.
- *
+ *
* @param hostname
* The host name which a server socket bind.
* @param port
@@ -335,7 +322,7 @@ public abstract class StreamExecutionEnvironment {
* Creates a new DataStream that contains the strings received
infinitely
* from socket. Received strings are decoded by the system's default
* character set, uses '\n' as delimiter.
- *
+ *
* @param hostname
* The host name which a server socket bind.
* @param port
@@ -378,13 +365,8 @@ public abstract class StreamExecutionEnvironment {
DataStreamSource<OUT> returnStream = new
DataStreamSource<OUT>(this, "source", outTypeInfo);
- try {
- //currently parallel sources are not directly supported
- jobGraphBuilder.addSourceVertex(returnStream.getId(),
function, null, outTypeInfo,
- "source",
SerializationUtils.serialize(function), 1);
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize
SourceFunction");
- }
+ jobGraphBuilder.addStreamVertex(returnStream.getId(), new
SourceInvokable<OUT>(function),
+ null, outTypeInfo, "source", 1);
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b1fd156/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index cb960dd..8afac75 100755
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -1,45 +1,49 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import org.apache.flink.util.Collector;
-
-public class FromElementsFunction<T> implements SourceFunction<T> {
- private static final long serialVersionUID = 1L;
-
- Iterable<T> iterable;
-
- public FromElementsFunction(T... elements) {
- this.iterable = Arrays.asList(elements);
- }
-
- public FromElementsFunction(Collection<T> elements) {
- this.iterable = elements;
- }
-
- @Override
- public void invoke(Collector<T> collector) throws Exception {
- for (T element : iterable) {
- collector.collect(element);
- }
- }
-
-}
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.util.Collector;
+
+public class FromElementsFunction<T> implements SourceFunction<T> {
+ private static final long serialVersionUID = 1L;
+
+ Iterable<T> iterable;
+
+ public FromElementsFunction(T... elements) {
+ this.iterable = Arrays.asList(elements);
+ }
+
+ public FromElementsFunction(Collection<T> elements) {
+ this.iterable = elements;
+ }
+
+ public FromElementsFunction(Iterable<T> elements) {
+ this.iterable = elements;
+ }
+
+ @Override
+ public void invoke(Collector<T> collector) throws Exception {
+ for (T element : iterable) {
+ collector.collect(element);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b1fd156/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 7504efd..d786d6b 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -37,7 +37,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable
implements StreamTa
protected int instanceID;
protected String name;
private static int numVertices = 0;
- protected Object function;
+
protected String functionName;
private InputHandler<IN> inputHandler;
@@ -72,7 +72,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable
implements StreamTa
this.configuration = new StreamConfig(getTaskConfiguration());
this.name = configuration.getVertexName();
this.functionName = configuration.getFunctionName();
- this.function = configuration.getFunction(userClassLoader);
this.states = configuration.getOperatorStates(userClassLoader);
this.context = createRuntimeContext(name, this.states);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b1fd156/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index e2d71e1..f55fc71 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -50,6 +50,12 @@ under the License.
<artifactId>flink-compiler</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.scala-lang</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/6b1fd156/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
new file mode 100644
index 0000000..711ce7c
--- /dev/null
+++
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import scala.reflect.ClassTag
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.streaming.api.invokable.operator.MapInvokable
+
+class DataStream[OUT](javaStream: JavaStream[OUT]) {
+
+ /* This code is originally from the Apache Spark project. */
+ /**
+ * Clean a closure to make it ready to serialized and send to tasks
+ * (removes unreferenced variables in $outer's, updates REPL variables)
+ * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
+ * check to see if <tt>f</tt> is serializable and throw a
<tt>SparkException</tt>
+ * if not.
+ *
+ * @param f the closure to clean
+ * @param checkSerializable whether or not to immediately check <tt>f</tt>
for serializability
+ * @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but
<tt>f</tt> is not
+ * serializable
+ */
+ private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean =
true): F = {
+ ClosureCleaner.clean(f, checkSerializable)
+ f
+ }
+
+ /**
+ * Creates a new DataStream by applying the given function to every element
of this DataStream.
+ */
+ def map[R: TypeInformation: ClassTag](fun: OUT => R): DataStream[R] = {
+ if (fun == null) {
+ throw new NullPointerException("Map function must not be null.")
+ }
+ val mapper = new MapFunction[OUT, R] {
+ val cleanFun = clean(fun)
+ def map(in: OUT): R = cleanFun(in)
+ }
+
+ new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]],
new MapInvokable[OUT, R](mapper)))
+ }
+
+ /**
+ * Creates a new DataStream by applying the given function to every element
of this DataStream.
+ */
+ def map[R: TypeInformation: ClassTag](mapper: MapFunction[OUT, R]):
DataStream[R] = {
+ if (mapper == null) {
+ throw new NullPointerException("Map function must not be null.")
+ }
+
+ new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]],
new MapInvokable[OUT, R](mapper)))
+ }
+
+ def print() = javaStream.print()
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/6b1fd156/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
new file mode 100644
index 0000000..df6c561
--- /dev/null
+++
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming
+
+import org.apache.flink.streaming.api.environment.{ StreamExecutionEnvironment
=> JavaEnv }
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.commons.lang.Validate
+import scala.reflect.ClassTag
+import org.apache.flink.streaming.api.datastream.DataStreamSource
+import org.apache.flink.streaming.api.invokable.SourceInvokable
+import org.apache.flink.streaming.api.function.source.FromElementsFunction
+
+class StreamExecutionEnvironment(javaEnv: JavaEnv) {
+
+ /**
+ * Sets the degree of parallelism (DOP) for operations executed through this
environment.
+ * Setting a DOP of x here will cause all operators (such as join, map,
reduce) to run with
+ * x parallel instances. This value can be overridden by specific operations
using
+ * [[DataStream.setParallelism]].
+ */
+ def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
+ javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+ }
+
+ /**
+ * Returns the default degree of parallelism for this execution environment.
Note that this
+ * value can be overridden by individual operations using
[[DataStream.setParallelism]]
+ */
+ def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
+
+ def generateSequence(from: Long, to: Long): DataStream[java.lang.Long] = new
DataStream(javaEnv.generateSequence(from, to))
+
+ /**
+ * Creates a new data stream that contains the given elements. The elements
must all be of the
+ * same type and must be serializable.
+ *
+ * * Note that this operation will result in a non-parallel data source,
i.e. a data source with
+ * a degree of parallelism of one.
+ */
+ def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
+ val typeInfo = implicitly[TypeInformation[T]]
+ fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
+ }
+
+ /**
+ * Creates a DataStream from the given non-empty [[Seq]]. The elements need
to be serializable
+ * because the framework may move the elements into the cluster if needed.
+ *
+ * Note that this operation will result in a non-parallel data source, i.e.
a data source with
+ * a degree of parallelism of one.
+ */
+ def fromCollection[T: ClassTag: TypeInformation](
+ data: Seq[T]): DataStream[T] = {
+ Validate.notNull(data, "Data must not be null.")
+ val typeInfo = implicitly[TypeInformation[T]]
+ val returnStream = new DataStreamSource[T](javaEnv,
+ "elements", typeInfo);
+
+ javaEnv.getJobGraphBuilder.addStreamVertex(returnStream.getId(),
+ new SourceInvokable[T](new
FromElementsFunction[T](scala.collection.JavaConversions.asJavaCollection(data))),
null, typeInfo,
+ "source", 1);
+ new DataStream(returnStream)
+ }
+
+ def execute() = javaEnv.execute()
+
+}
+
+object StreamExecutionEnvironment {
+
+ /**
+ * Creates an execution environment that represents the context in which the
program is
+ * currently executed. If the program is invoked standalone, this method
returns a local
+ * execution environment. If the program is invoked from within the command
line client
+ * to be submitted to a cluster, this method returns the execution
environment of this cluster.
+ */
+ def getExecutionEnvironment: StreamExecutionEnvironment = {
+ new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment)
+ }
+
+ /**
+ * Creates a local execution environment. The local execution environment
will run the program in
+ * a multi-threaded fashion in the same JVM as the environment was created
in. The default degree
+ * of parallelism of the local environment is the number of hardware
contexts (CPU cores/threads).
+ */
+ def createLocalEnvironment(
+ degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()):
StreamExecutionEnvironment = {
+ new
StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism))
+ }
+
+ /**
+ * Creates a remote execution environment. The remote environment sends
(parts of) the program to
+ * a cluster for execution. Note that all file paths used in the program
must be accessible from
+ * the cluster. The execution will use the cluster's default degree of
parallelism, unless the
+ * parallelism is set explicitly via
[[ExecutionEnvironment.setDegreeOfParallelism()]].
+ *
+ * @param host The host name or address of the master (JobManager),
+ * where the program should be executed.
+ * @param port The port of the master (JobManager), where the program should
be executed.
+ * @param jarFiles The JAR files with code that needs to be shipped to the
cluster. If the
+ * program uses
+ * user-defined functions, user-defined input formats, or
any libraries,
+ * those must be
+ * provided in the JAR files.
+ */
+ def createRemoteEnvironment(host: String, port: Int, jarFiles: String*):
StreamExecutionEnvironment = {
+ new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port,
jarFiles: _*))
+ }
+
+ /**
+ * Creates a remote execution environment. The remote environment sends
(parts of) the program
+ * to a cluster for execution. Note that all file paths used in the program
must be accessible
+ * from the cluster. The execution will use the specified degree of
parallelism.
+ *
+ * @param host The host name or address of the master (JobManager),
+ * where the program should be executed.
+ * @param port The port of the master (JobManager), where the program should
be executed.
+ * @param degreeOfParallelism The degree of parallelism to use during the
execution.
+ * @param jarFiles The JAR files with code that needs to be shipped to the
cluster. If the
+ * program uses
+ * user-defined functions, user-defined input formats, or
any libraries,
+ * those must be
+ * provided in the JAR files.
+ */
+ def createRemoteEnvironment(
+ host: String,
+ port: Int,
+ degreeOfParallelism: Int,
+ jarFiles: String*): StreamExecutionEnvironment = {
+ val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
+ javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+ new StreamExecutionEnvironment(javaEnv)
+ }
+}
\ No newline at end of file