[scala] [streaming] Base functionality added for streaming scala api
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/34353f66 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/34353f66 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/34353f66 Branch: refs/heads/master Commit: 34353f6658e9a4dd50ad860e17eee94804b76ccb Parents: 87d699d Author: Gyula Fora <[email protected]> Authored: Thu Dec 11 15:22:03 2014 +0100 Committer: Gyula Fora <[email protected]> Committed: Fri Jan 2 18:34:38 2015 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/JobGraphBuilder.java | 37 ++--- .../flink/streaming/api/StreamConfig.java | 16 -- .../api/datastream/ConnectedDataStream.java | 47 +++--- .../streaming/api/datastream/DataStream.java | 70 ++++---- .../api/datastream/GroupedDataStream.java | 10 +- .../api/datastream/StreamProjection.java | 161 ++++++++----------- .../api/datastream/WindowedDataStream.java | 14 +- .../environment/StreamExecutionEnvironment.java | 37 ++--- .../function/source/FromElementsFunction.java | 90 ++++++----- .../api/streamvertex/StreamVertex.java | 3 +- flink-scala/pom.xml | 6 + .../flink/api/scala/streaming/DataStream.scala | 76 +++++++++ .../streaming/StreamExecutionEnvironment.scala | 150 +++++++++++++++++ 13 files changed, 425 insertions(+), 292 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java index d66e388..f358de9 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java @@ -72,7 +72,6 @@ public class JobGraphBuilder { private Map<String, StreamRecordSerializer<?>> typeSerializersIn2; private Map<String, StreamRecordSerializer<?>> typeSerializersOut1; private Map<String, StreamRecordSerializer<?>> typeSerializersOut2; - private Map<String, byte[]> serializedFunctions; private Map<String, byte[]> outputSelectors; private Map<String, Class<? extends AbstractInvokable>> vertexClasses; private Map<String, Integer> iterationIds; @@ -104,7 +103,6 @@ public class JobGraphBuilder { typeSerializersIn2 = new HashMap<String, StreamRecordSerializer<?>>(); typeSerializersOut1 = new HashMap<String, StreamRecordSerializer<?>>(); typeSerializersOut2 = new HashMap<String, StreamRecordSerializer<?>>(); - serializedFunctions = new HashMap<String, byte[]>(); outputSelectors = new HashMap<String, byte[]>(); vertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>(); iterationIds = new HashMap<String, Integer>(); @@ -133,18 +131,14 @@ public class JobGraphBuilder { * Output type for serialization * @param operatorName * Operator type - * @param serializedFunction - * Serialized udf * @param parallelism * Number of parallel instances created */ public <IN, OUT> void addStreamVertex(String vertexName, StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo, - TypeInformation<OUT> outTypeInfo, String operatorName, byte[] serializedFunction, - int parallelism) { + TypeInformation<OUT> outTypeInfo, String operatorName, int parallelism) { - addVertex(vertexName, StreamVertex.class, invokableObject, operatorName, - serializedFunction, parallelism); + addVertex(vertexName, StreamVertex.class, invokableObject, operatorName, parallelism); StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>( inTypeInfo) : null; @@ -171,8 +165,6 @@ public class JobGraphBuilder { * Output type for serialization * @param operatorName * Operator type - * @param serializedFunction - * Serialized udf * @param parallelism * Number of parallel instances created */ @@ -185,7 +177,7 @@ public class JobGraphBuilder { function); addStreamVertex(vertexName, invokableObject, inTypeInfo, outTypeInfo, operatorName, - serializedFunction, parallelism); + parallelism); } /** @@ -206,7 +198,7 @@ public class JobGraphBuilder { public void addIterationHead(String vertexName, String iterationHead, Integer iterationID, int parallelism, long waitTime) { - addVertex(vertexName, StreamIterationHead.class, null, null, null, parallelism); + addVertex(vertexName, StreamIterationHead.class, null, null, parallelism); iterationIds.put(vertexName, iterationID); iterationIDtoHeadName.put(iterationID, vertexName); @@ -247,7 +239,7 @@ public class JobGraphBuilder { throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported."); } - addVertex(vertexName, StreamIterationTail.class, null, null, null, parallelism); + addVertex(vertexName, StreamIterationTail.class, null, null, parallelism); iterationIds.put(vertexName, iterationID); iterationIDtoTailName.put(iterationID, vertexName); @@ -264,10 +256,9 @@ public class JobGraphBuilder { public <IN1, IN2, OUT> void addCoTask(String vertexName, CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, - String operatorName, byte[] serializedFunction, int parallelism) { + String operatorName, int parallelism) { - addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName, - serializedFunction, parallelism); + addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism); addTypeSerializers(vertexName, new StreamRecordSerializer<IN1>(in1TypeInfo), new StreamRecordSerializer<IN2>(in2TypeInfo), new StreamRecordSerializer<OUT>( @@ -289,20 +280,16 @@ public class JobGraphBuilder { * The user defined invokable object * @param operatorName * Type of the user defined operator - * @param serializedFunction - * Serialized operator * @param parallelism * Number of parallel instances created */ private void addVertex(String vertexName, Class<? extends AbstractInvokable> vertexClass, - StreamInvokable<?, ?> invokableObject, String operatorName, byte[] serializedFunction, - int parallelism) { + StreamInvokable<?, ?> invokableObject, String operatorName, int parallelism) { vertexClasses.put(vertexName, vertexClass); setParallelism(vertexName, parallelism); invokableObjects.put(vertexName, invokableObject); operatorNames.put(vertexName, operatorName); - serializedFunctions.put(vertexName, serializedFunction); outEdgeList.put(vertexName, new ArrayList<String>()); outEdgeType.put(vertexName, new ArrayList<Integer>()); outEdgeNames.put(vertexName, new ArrayList<List<String>>()); @@ -333,8 +320,6 @@ public class JobGraphBuilder { // Get vertex attributes Class<? extends AbstractInvokable> vertexClass = vertexClasses.get(vertexName); StreamInvokable<?, ?> invokableObject = invokableObjects.get(vertexName); - String operatorName = operatorNames.get(vertexName); - byte[] serializedFunction = serializedFunctions.get(vertexName); int parallelism = vertexParallelism.get(vertexName); byte[] outputSelector = outputSelectors.get(vertexName); Map<String, OperatorState<?>> state = operatorStates.get(vertexName); @@ -362,7 +347,6 @@ public class JobGraphBuilder { // Set vertex config config.setUserInvokable(invokableObject); config.setVertexName(vertexName); - config.setFunction(serializedFunction, operatorName); config.setOutputSelector(outputSelector); config.setOperatorStates(state); @@ -522,8 +506,8 @@ public class JobGraphBuilder { } /** - * Sets udf operator and TypeSerializerWrapper from one vertex to another, - * used with some sinks. + * Sets TypeSerializerWrapper from one vertex to another, used with some + * sinks. * * @param from * from @@ -532,7 +516,6 @@ public class JobGraphBuilder { */ public void setBytesFrom(String from, String to) { operatorNames.put(to, operatorNames.get(from)); - serializedFunctions.put(to, serializedFunctions.get(from)); typeSerializersIn1.put(to, typeSerializersOut1.get(from)); typeSerializersIn2.put(to, typeSerializersOut2.get(from)); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index 9800b63..8837b85 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -47,7 +47,6 @@ public class StreamConfig { private static final String OUTPUT_SELECTOR = "outputSelector"; private static final String DIRECTED_EMIT = "directedEmit"; private static final String FUNCTION_NAME = "operatorName"; - private static final String FUNCTION = "operator"; private static final String VERTEX_NAME = "vertexName"; private static final String SERIALIZEDUDF = "serializedudf"; private static final String USER_FUNCTION = "userfunction"; @@ -173,21 +172,6 @@ public class StreamConfig { return config.getString(VERTEX_NAME, null); } - public void setFunction(byte[] serializedFunction, String functionName) { - if (serializedFunction != null) { - config.setBytes(FUNCTION, serializedFunction); - config.setString(FUNCTION_NAME, functionName); - } - } - - public Object getFunction(ClassLoader cl) { - try { - return InstantiationUtil.readObjectFromConfig(this.config, FUNCTION, cl); - } catch (Exception e) { - throw new RuntimeException("Cannot deserialize invokable object", e); - } - } - public String getFunctionName() { return config.getString(FUNCTION_NAME, ""); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index dcc3dab..c663315 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -17,13 +17,9 @@ package org.apache.flink.streaming.api.datastream; -import java.io.Serializable; import java.util.List; -import org.apache.commons.lang3.SerializationException; -import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.api.common.functions.CrossFunction; -import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ClosureCleaner; @@ -53,7 +49,7 @@ import org.apache.flink.util.Collector; * The ConnectedDataStream represents a stream for two different data types. It * can be used to apply transformations like {@link CoMapFunction} on two * {@link DataStream}s - * + * * @param <IN1> * Type of the first input data steam. * @param <IN2> @@ -417,8 +413,9 @@ public class ConnectedDataStream<IN1, IN2> { TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoMapFunction.class, coMapper.getClass(), 2, null, null); - return addCoFunction("coMap", clean(coMapper), outTypeInfo, - new CoMapInvokable<IN1, IN2, OUT>(clean(coMapper))); + return addCoFunction("coMap", outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>( + clean(coMapper))); + } /** @@ -441,8 +438,8 @@ public class ConnectedDataStream<IN1, IN2> { TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoFlatMapFunction.class, coFlatMapper.getClass(), 2, null, null); - return addCoFunction("coFlatMap", clean(coFlatMapper), outTypeInfo, - new CoFlatMapInvokable<IN1, IN2, OUT>(clean(coFlatMapper))); + return addCoFunction("coFlatMap", outTypeInfo, new CoFlatMapInvokable<IN1, IN2, OUT>( + clean(coFlatMapper))); } /** @@ -466,8 +463,8 @@ public class ConnectedDataStream<IN1, IN2> { TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoReduceFunction.class, coReducer.getClass(), 2, null, null); - return addCoFunction("coReduce", clean(coReducer), outTypeInfo, - getReduceInvokable(clean(coReducer))); + return addCoFunction("coReduce", outTypeInfo, getReduceInvokable(clean(coReducer))); + } /** @@ -531,9 +528,9 @@ public class ConnectedDataStream<IN1, IN2> { TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoWindowFunction.class, coWindowFunction.getClass(), 2, null, null); - return addCoFunction("coWindowReduce", clean(coWindowFunction), outTypeInfo, - new CoWindowInvokable<IN1, IN2, OUT>(clean(coWindowFunction), windowSize, slideInterval, - timestamp1, timestamp2)); + return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>( + clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2)); + } protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable( @@ -607,27 +604,21 @@ public class ConnectedDataStream<IN1, IN2> { throw new IllegalArgumentException("Slide interval must be positive"); } - return addCoFunction("coWindowReduce", clean(coWindowFunction), outTypeInfo, - new CoWindowInvokable<IN1, IN2, OUT>(clean(coWindowFunction), windowSize, slideInterval, - timestamp1, timestamp2)); + return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>( + clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2)); + } - protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName, - final Function function, TypeInformation<OUT> outTypeInfo, - CoInvokable<IN1, IN2, OUT> functionInvokable) { + public <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName, + TypeInformation<OUT> outTypeInfo, CoInvokable<IN1, IN2, OUT> functionInvokable) { @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator( environment, functionName, outTypeInfo); - try { - dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, - getInputType1(), getInputType2(), outTypeInfo, functionName, - SerializationUtils.serialize((Serializable) function), - environment.getDegreeOfParallelism()); - } catch (SerializationException e) { - throw new RuntimeException("Cannot serialize user defined function"); - } + dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, + getInputType1(), getInputType2(), outTypeInfo, functionName, + environment.getDegreeOfParallelism()); dataStream1.connectGraph(dataStream1, returnStream.getId(), 1); dataStream1.connectGraph(dataStream2, returnStream.getId(), 2); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 6e8da0a..04929c1 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -17,15 +17,11 @@ package org.apache.flink.streaming.api.datastream; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import org.apache.commons.lang3.SerializationException; -import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFilterFunction; @@ -401,8 +397,7 @@ public class DataStream<OUT> { TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType()); - return addFunction("map", clean(mapper), getType(), outType, new MapInvokable<OUT, R>( - clean(mapper))); + return transform("map", outType, new MapInvokable<OUT, R>(clean(mapper))); } /** @@ -423,10 +418,11 @@ public class DataStream<OUT> { */ public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<OUT, R> flatMapper) { - TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType()); + TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), + getType()); + + return transform("flatMap", outType, new FlatMapInvokable<OUT, R>(clean(flatMapper))); - return addFunction("flatMap", clean(flatMapper), getType(), outType, - new FlatMapInvokable<OUT, R>(clean(flatMapper))); } /** @@ -442,8 +438,8 @@ public class DataStream<OUT> { */ public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) { - return addFunction("reduce", clean(reducer), getType(), getType(), - new StreamReduceInvokable<OUT>(clean(reducer))); + return transform("reduce", getType(), new StreamReduceInvokable<OUT>(clean(reducer))); + } /** @@ -461,8 +457,8 @@ public class DataStream<OUT> { * @return The filtered DataStream. */ public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) { - return addFunction("filter", clean(filter), getType(), getType(), new FilterInvokable<OUT>(clean( - filter))); + return transform("filter", getType(), new FilterInvokable<OUT>(clean(filter))); + } /** @@ -742,7 +738,7 @@ public class DataStream<OUT> { public SingleOutputStreamOperator<Long, ?> count() { TypeInformation<Long> outTypeInfo = TypeExtractor.getForObject(Long.valueOf(0)); - return addFunction("counter", null, getType(), outTypeInfo, new CounterInvokable<OUT>()); + return transform("counter", outTypeInfo, new CounterInvokable<OUT>()); } /** @@ -1120,8 +1116,7 @@ public class DataStream<OUT> { StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate); - SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", clean(aggregate), - typeInfo, typeInfo, invokable); + SingleOutputStreamOperator<OUT, ?> returnStream = transform("reduce", typeInfo, invokable); return returnStream; } @@ -1137,34 +1132,28 @@ public class DataStream<OUT> { } /** - * Internal function for passing the user defined functions to the JobGraph - * of the job. - * - * @param functionName - * name of the function - * @param function - * the user defined function - * @param functionInvokable - * the wrapping JobVertex instance + * Method for passing user defined invokables along with the type + * informations that will transform the DataStream. + * + * @param operatorName + * name of the operator, for logging purposes + * @param outTypeInfo + * the output type of the operator + * @param invokable + * the object containing the transformation logic * @param <R> * type of the return stream * @return the data stream constructed */ - protected <R> SingleOutputStreamOperator<R, ?> addFunction(String functionName, - final Function function, TypeInformation<OUT> inTypeInfo, - TypeInformation<R> outTypeInfo, StreamInvokable<OUT, R> functionInvokable) { + public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, + TypeInformation<R> outTypeInfo, StreamInvokable<OUT, R> invokable) { DataStream<OUT> inputStream = this.copy(); @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment, - functionName, outTypeInfo); + operatorName, outTypeInfo); - try { - jobGraphBuilder.addStreamVertex(returnStream.getId(), functionInvokable, inTypeInfo, - outTypeInfo, functionName, - SerializationUtils.serialize((Serializable) function), degreeOfParallelism); - } catch (SerializationException e) { - throw new RuntimeException("Cannot serialize user defined function"); - } + jobGraphBuilder.addStreamVertex(returnStream.getId(), invokable, getType(), outTypeInfo, + operatorName, degreeOfParallelism); connectGraph(inputStream, returnStream.getId(), 0); @@ -1235,13 +1224,8 @@ public class DataStream<OUT> { SinkFunction<OUT> sinkFunction, TypeInformation<OUT> inTypeInfo) { DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", typeInfo); - try { - jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>( - clean(sinkFunction)), inTypeInfo, null, "sink", SerializationUtils - .serialize(clean(sinkFunction)), degreeOfParallelism); - } catch (SerializationException e) { - throw new RuntimeException("Cannot serialize SinkFunction"); - } + jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>( + clean(sinkFunction)), inTypeInfo, null, "sink", degreeOfParallelism); inputStream.connectGraph(inputStream.copy(), returnStream.getId(), 0); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java index 2620d3e..a2c0f89 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java @@ -29,7 +29,7 @@ import org.apache.flink.streaming.partitioner.StreamPartitioner; * partitioned by the given {@link KeySelector}. Operators like {@link #reduce}, * {@link #batchReduce} etc. can be applied on the {@link GroupedDataStream} to * get additional functionality by the grouping. - * + * * @param <OUT> * The output type of the {@link GroupedDataStream}. */ @@ -62,8 +62,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> { */ @Override public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) { - return addFunction("groupReduce", clean(reducer), getType(), getType(), - new GroupedReduceInvokable<OUT>(clean(reducer), keySelector)); + return transform("groupReduce", getType(), new GroupedReduceInvokable<OUT>(clean(reducer), + keySelector)); } /** @@ -182,8 +182,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> { GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(clean(aggregate), keySelector); - SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", clean(aggregate), - typeInfo, typeInfo, invokable); + SingleOutputStreamOperator<OUT, ?> returnStream = transform("groupReduce", typeInfo, + invokable); return returnStream; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java index cc5f66e..e71b18c 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java @@ -83,8 +83,8 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple1<T0>> outType = (TypeInformation<Tuple1<T0>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.addFunction("projection", null, inTypeInfo, outType, - new ProjectInvokable<IN, Tuple1<T0>>(fieldIndexes, outType)); + return dataStream.transform("projection", outType, new ProjectInvokable<IN, Tuple1<T0>>( + fieldIndexes, outType)); } /** @@ -111,7 +111,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple2<T0, T1>> outType = (TypeInformation<Tuple2<T0, T1>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.addFunction("projection", null, inTypeInfo, outType, + return dataStream.transform("projection", outType, new ProjectInvokable<IN, Tuple2<T0, T1>>(fieldIndexes, outType)); } @@ -141,7 +141,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple3<T0, T1, T2>> outType = (TypeInformation<Tuple3<T0, T1, T2>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.addFunction("projection", null, inTypeInfo, outType, + return dataStream.transform("projection", outType, new ProjectInvokable<IN, Tuple3<T0, T1, T2>>(fieldIndexes, outType)); } @@ -173,7 +173,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple4<T0, T1, T2, T3>> outType = (TypeInformation<Tuple4<T0, T1, T2, T3>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.addFunction("projection", null, inTypeInfo, outType, + return dataStream.transform("projection", outType, new ProjectInvokable<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, outType)); } @@ -206,14 +206,14 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple5<T0, T1, T2, T3, T4>> outType = (TypeInformation<Tuple5<T0, T1, T2, T3, T4>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.addFunction("projection", null, inTypeInfo, outType, + return dataStream.transform("projection", outType, new ProjectInvokable<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, outType)); } /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -227,7 +227,7 @@ public class StreamProjection<IN> { * @param type5 * The class of field '5' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -243,14 +243,14 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>> outType = (TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.addFunction("projection", null, inTypeInfo, outType, + return dataStream.transform("projection", outType, new ProjectInvokable<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, outType)); } /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -266,7 +266,7 @@ public class StreamProjection<IN> { * @param type6 * The class of field '6' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -283,7 +283,7 @@ public class StreamProjection<IN> { TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>> outType = (TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .addFunction("projection", null, inTypeInfo, outType, + .transform("projection", outType, new ProjectInvokable<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes, outType)); } @@ -291,7 +291,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -309,7 +309,7 @@ public class StreamProjection<IN> { * @param type7 * The class of field '7' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -325,7 +325,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> outType = (TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.addFunction("projection", null, inTypeInfo, outType, + return dataStream.transform("projection", outType, new ProjectInvokable<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes, outType)); } @@ -333,7 +333,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -353,7 +353,7 @@ public class StreamProjection<IN> { * @param type8 * The class of field '8' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -369,7 +369,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> outType = (TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.addFunction("projection", null, inTypeInfo, outType, + return dataStream.transform("projection", outType, new ProjectInvokable<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes, outType)); } @@ -377,7 +377,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -399,7 +399,7 @@ public class StreamProjection<IN> { * @param type9 * The class of field '9' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -415,7 +415,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> outType = (TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.addFunction("projection", null, inTypeInfo, outType, + return dataStream.transform("projection", outType, new ProjectInvokable<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>( fieldIndexes, outType)); } @@ -423,7 +423,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -447,7 +447,7 @@ public class StreamProjection<IN> { * @param type10 * The class of field '10' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -465,7 +465,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> outType = (TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.addFunction("projection", null, inTypeInfo, outType, + return dataStream.transform("projection", outType, new ProjectInvokable<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>( fieldIndexes, outType)); } @@ -473,7 +473,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -499,7 +499,7 @@ public class StreamProjection<IN> { * @param type11 * The class of field '11' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -518,10 +518,8 @@ public class StreamProjection<IN> { TypeInformation<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> outType = (TypeInformation<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .addFunction( + .transform( "projection", - null, - inTypeInfo, outType, new ProjectInvokable<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>( fieldIndexes, outType)); @@ -530,7 +528,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -558,7 +556,7 @@ public class StreamProjection<IN> { * @param type12 * The class of field '12' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -577,10 +575,8 @@ public class StreamProjection<IN> { TypeInformation<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> outType = (TypeInformation<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .addFunction( + .transform( "projection", - null, - inTypeInfo, outType, new ProjectInvokable<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>( fieldIndexes, outType)); @@ -589,7 +585,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -619,7 +615,7 @@ public class StreamProjection<IN> { * @param type13 * The class of field '13' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -638,10 +634,8 @@ public class StreamProjection<IN> { TypeInformation<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> outType = (TypeInformation<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .addFunction( + .transform( "projection", - null, - inTypeInfo, outType, new ProjectInvokable<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>( fieldIndexes, outType)); @@ -650,7 +644,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -682,7 +676,7 @@ public class StreamProjection<IN> { * @param type14 * The class of field '14' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -702,10 +696,8 @@ public class StreamProjection<IN> { TypeInformation<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> outType = (TypeInformation<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .addFunction( + .transform( "projection", - null, - inTypeInfo, outType, new ProjectInvokable<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>( fieldIndexes, outType)); @@ -714,7 +706,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -748,7 +740,7 @@ public class StreamProjection<IN> { * @param type15 * The class of field '15' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -768,10 +760,8 @@ public class StreamProjection<IN> { TypeInformation<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> outType = (TypeInformation<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .addFunction( + .transform( "projection", - null, - inTypeInfo, outType, new ProjectInvokable<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>( fieldIndexes, outType)); @@ -780,7 +770,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -816,7 +806,7 @@ public class StreamProjection<IN> { * @param type16 * The class of field '16' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -836,10 +826,8 @@ public class StreamProjection<IN> { TypeInformation<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> outType = (TypeInformation<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .addFunction( + .transform( "projection", - null, - inTypeInfo, outType, new ProjectInvokable<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>( fieldIndexes, outType)); @@ -848,7 +836,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -886,7 +874,7 @@ public class StreamProjection<IN> { * @param type17 * The class of field '17' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -906,10 +894,8 @@ public class StreamProjection<IN> { TypeInformation<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> outType = (TypeInformation<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .addFunction( + .transform( "projection", - null, - inTypeInfo, outType, new ProjectInvokable<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>( fieldIndexes, outType)); @@ -918,7 +904,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -958,7 +944,7 @@ public class StreamProjection<IN> { * @param type18 * The class of field '18' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -979,10 +965,8 @@ public class StreamProjection<IN> { TypeInformation<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> outType = (TypeInformation<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .addFunction( + .transform( "projection", - null, - inTypeInfo, outType, new ProjectInvokable<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>( fieldIndexes, outType)); @@ -991,7 +975,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -1033,7 +1017,7 @@ public class StreamProjection<IN> { * @param type19 * The class of field '19' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -1054,10 +1038,8 @@ public class StreamProjection<IN> { TypeInformation<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> outType = (TypeInformation<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .addFunction( + .transform( "projection", - null, - inTypeInfo, outType, new ProjectInvokable<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>( fieldIndexes, outType)); @@ -1066,7 +1048,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -1110,7 +1092,7 @@ public class StreamProjection<IN> { * @param type20 * The class of field '20' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -1132,10 +1114,8 @@ public class StreamProjection<IN> { TypeInformation<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> outType = (TypeInformation<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .addFunction( + .transform( "projection", - null, - inTypeInfo, outType, new ProjectInvokable<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>( fieldIndexes, outType)); @@ -1144,7 +1124,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -1190,7 +1170,7 @@ public class StreamProjection<IN> { * @param type21 * The class of field '21' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -1212,10 +1192,8 @@ public class StreamProjection<IN> { TypeInformation<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> outType = (TypeInformation<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .addFunction( + .transform( "projection", - null, - inTypeInfo, outType, new ProjectInvokable<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>( fieldIndexes, outType)); @@ -1224,7 +1202,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -1272,7 +1250,7 @@ public class StreamProjection<IN> { * @param type22 * The class of field '22' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -1295,10 +1273,8 @@ public class StreamProjection<IN> { TypeInformation<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> outType = (TypeInformation<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .addFunction( + .transform( "projection", - null, - inTypeInfo, outType, new ProjectInvokable<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>( fieldIndexes, outType)); @@ -1307,7 +1283,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -1357,7 +1333,7 @@ public class StreamProjection<IN> { * @param type23 * The class of field '23' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -1380,10 +1356,8 @@ public class StreamProjection<IN> { TypeInformation<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> outType = (TypeInformation<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .addFunction( + .transform( "projection", - null, - inTypeInfo, outType, new ProjectInvokable<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>( fieldIndexes, outType)); @@ -1392,7 +1366,7 @@ public class StreamProjection<IN> { /** * Projects a {@link Tuple} {@link DataStream} to the previously selected * fields. Requires the classes of the fields of the resulting Tuples. - * + * * @param type0 * The class of field '0' of the result Tuples. * @param type1 @@ -1444,7 +1418,7 @@ public class StreamProjection<IN> { * @param type24 * The class of field '24' of the result Tuples. * @return The projected DataStream. - * + * * @see Tuple * @see DataStream */ @@ -1467,10 +1441,9 @@ public class StreamProjection<IN> { TypeInformation<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> outType = (TypeInformation<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .addFunction( + .transform( "projection", - null, - inTypeInfo, + outType, new ProjectInvokable<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>( fieldIndexes, outType)); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index 788f28d..cb9cd04 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -50,7 +50,7 @@ import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy; * into windows (predefined chunks). User defined function such as * {@link #reduce(ReduceFunction)}, {@link #reduceGroup(GroupReduceFunction)} or * aggregations can be applied to the windows. - * + * * @param <OUT> * The output type of the {@link WindowedDataStream} */ @@ -124,8 +124,8 @@ public class WindowedDataStream<OUT> { this.userEvicters = windowedDataStream.userEvicters; this.allCentral = windowedDataStream.allCentral; } - - public <F> F clean(F f){ + + public <F> F clean(F f) { return dataStream.clean(f); } @@ -231,7 +231,7 @@ public class WindowedDataStream<OUT> { * @return The transformed DataStream */ public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) { - return dataStream.addFunction("NextGenWindowReduce", clean(reduceFunction), getType(), getType(), + return dataStream.transform("NextGenWindowReduce", getType(), getReduceInvokable(reduceFunction)); } @@ -255,7 +255,7 @@ public class WindowedDataStream<OUT> { TypeInformation<R> outType = TypeExtractor .getGroupReduceReturnTypes(reduceFunction, inType); - return dataStream.addFunction("NextGenWindowReduce", clean(reduceFunction), inType, outType, + return dataStream.transform("NextGenWindowReduce", outType, getReduceGroupInvokable(reduceFunction)); } @@ -457,8 +457,8 @@ public class WindowedDataStream<OUT> { private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregator) { StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator); - SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("windowReduce", - clean(aggregator), getType(), getType(), invokable); + SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.transform("windowReduce", + getType(), invokable); return returnStream; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index d26b714..f50ab91 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -22,8 +22,6 @@ import java.io.Serializable; import java.util.Collection; import java.util.List; -import org.apache.commons.lang3.SerializationException; -import org.apache.commons.lang3.SerializationUtils; import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; @@ -264,14 +262,10 @@ public abstract class StreamExecutionEnvironment { DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements", outTypeInfo); - try { - SourceFunction<OUT> function = new FromElementsFunction<OUT>(data); - jobGraphBuilder.addStreamVertex(returnStream.getId(), - new SourceInvokable<OUT>(function), null, outTypeInfo, "source", - SerializationUtils.serialize(function), 1); - } catch (SerializationException e) { - throw new RuntimeException("Cannot serialize elements"); - } + SourceFunction<OUT> function = new FromElementsFunction<OUT>(data); + jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function), + null, outTypeInfo, "source", 1); + return returnStream; } @@ -300,15 +294,8 @@ public abstract class StreamExecutionEnvironment { DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "collection", outTypeInfo); - try { - SourceFunction<OUT> function = new FromElementsFunction<OUT>(data); - - jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>( - new FromElementsFunction<OUT>(data)), null, outTypeInfo, "source", - SerializationUtils.serialize(function), 1); - } catch (SerializationException e) { - throw new RuntimeException("Cannot serialize collection"); - } + jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>( + new FromElementsFunction<OUT>(data)), null, outTypeInfo, "source", 1); return returnStream; } @@ -317,7 +304,7 @@ public abstract class StreamExecutionEnvironment { * Creates a new DataStream that contains the strings received infinitely * from socket. Received strings are decoded by the system's default * character set. - * + * * @param hostname * The host name which a server socket bind. * @param port @@ -335,7 +322,7 @@ public abstract class StreamExecutionEnvironment { * Creates a new DataStream that contains the strings received infinitely * from socket. Received strings are decoded by the system's default * character set, uses '\n' as delimiter. - * + * * @param hostname * The host name which a server socket bind. * @param port @@ -378,12 +365,8 @@ public abstract class StreamExecutionEnvironment { DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source", outTypeInfo); - try { - jobGraphBuilder.addSourceVertex(returnStream.getId(), function, null, outTypeInfo, - "source", SerializationUtils.serialize(function), getDegreeOfParallelism()); - } catch (SerializationException e) { - throw new RuntimeException("Cannot serialize SourceFunction"); - } + jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function), + null, outTypeInfo, "source", 1); return returnStream; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java index cb960dd..8afac75 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java @@ -1,45 +1,49 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. - */ - -package org.apache.flink.streaming.api.function.source; - -import java.util.Arrays; -import java.util.Collection; - -import org.apache.flink.util.Collector; - -public class FromElementsFunction<T> implements SourceFunction<T> { - private static final long serialVersionUID = 1L; - - Iterable<T> iterable; - - public FromElementsFunction(T... elements) { - this.iterable = Arrays.asList(elements); - } - - public FromElementsFunction(Collection<T> elements) { - this.iterable = elements; - } - - @Override - public void invoke(Collector<T> collector) throws Exception { - for (T element : iterable) { - collector.collect(element); - } - } - -} + */ + +package org.apache.flink.streaming.api.function.source; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.flink.util.Collector; + +public class FromElementsFunction<T> implements SourceFunction<T> { + private static final long serialVersionUID = 1L; + + Iterable<T> iterable; + + public FromElementsFunction(T... elements) { + this.iterable = Arrays.asList(elements); + } + + public FromElementsFunction(Collection<T> elements) { + this.iterable = elements; + } + + public FromElementsFunction(Iterable<T> elements) { + this.iterable = elements; + } + + @Override + public void invoke(Collector<T> collector) throws Exception { + for (T element : iterable) { + collector.collect(element); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java index 7504efd..d786d6b 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java @@ -37,7 +37,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa protected int instanceID; protected String name; private static int numVertices = 0; - protected Object function; + protected String functionName; private InputHandler<IN> inputHandler; @@ -72,7 +72,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa this.configuration = new StreamConfig(getTaskConfiguration()); this.name = configuration.getVertexName(); this.functionName = configuration.getFunctionName(); - this.function = configuration.getFunction(userClassLoader); this.states = configuration.getOperatorStates(userClassLoader); this.context = createRuntimeContext(name, this.states); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-scala/pom.xml ---------------------------------------------------------------------- diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml index b902655..4ccbb98 100644 --- a/flink-scala/pom.xml +++ b/flink-scala/pom.xml @@ -50,6 +50,12 @@ under the License. <artifactId>flink-compiler</artifactId> <version>${project.version}</version> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-core</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.scala-lang</groupId> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala new file mode 100644 index 0000000..711ce7c --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.streaming +import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } +import org.apache.flink.api.scala.ClosureCleaner +import org.apache.flink.api.common.typeinfo.TypeInformation +import scala.reflect.ClassTag +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.streaming.api.invokable.operator.MapInvokable + +class DataStream[OUT](javaStream: JavaStream[OUT]) { + + /* This code is originally from the Apache Spark project. */ + /** + * Clean a closure to make it ready to serialized and send to tasks + * (removes unreferenced variables in $outer's, updates REPL variables) + * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively + * check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt> + * if not. + * + * @param f the closure to clean + * @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability + * @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not + * serializable + */ + private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { + ClosureCleaner.clean(f, checkSerializable) + f + } + + /** + * Creates a new DataStream by applying the given function to every element of this DataStream. + */ + def map[R: TypeInformation: ClassTag](fun: OUT => R): DataStream[R] = { + if (fun == null) { + throw new NullPointerException("Map function must not be null.") + } + val mapper = new MapFunction[OUT, R] { + val cleanFun = clean(fun) + def map(in: OUT): R = cleanFun(in) + } + + new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[OUT, R](mapper))) + } + + /** + * Creates a new DataStream by applying the given function to every element of this DataStream. + */ + def map[R: TypeInformation: ClassTag](mapper: MapFunction[OUT, R]): DataStream[R] = { + if (mapper == null) { + throw new NullPointerException("Map function must not be null.") + } + + new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[OUT, R](mapper))) + } + + def print() = javaStream.print() + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala new file mode 100644 index 0000000..df6c561 --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.streaming + +import org.apache.flink.streaming.api.environment.{ StreamExecutionEnvironment => JavaEnv } +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.commons.lang.Validate +import scala.reflect.ClassTag +import org.apache.flink.streaming.api.datastream.DataStreamSource +import org.apache.flink.streaming.api.invokable.SourceInvokable +import org.apache.flink.streaming.api.function.source.FromElementsFunction + +class StreamExecutionEnvironment(javaEnv: JavaEnv) { + + /** + * Sets the degree of parallelism (DOP) for operations executed through this environment. + * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with + * x parallel instances. This value can be overridden by specific operations using + * [[DataStream.setParallelism]]. + */ + def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = { + javaEnv.setDegreeOfParallelism(degreeOfParallelism) + } + + /** + * Returns the default degree of parallelism for this execution environment. Note that this + * value can be overridden by individual operations using [[DataStream.setParallelism]] + */ + def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism + + def generateSequence(from: Long, to: Long): DataStream[java.lang.Long] = new DataStream(javaEnv.generateSequence(from, to)) + + /** + * Creates a new data stream that contains the given elements. The elements must all be of the + * same type and must be serializable. + * + * * Note that this operation will result in a non-parallel data source, i.e. a data source with + * a degree of parallelism of one. + */ + def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = { + val typeInfo = implicitly[TypeInformation[T]] + fromCollection(data)(implicitly[ClassTag[T]], typeInfo) + } + + /** + * Creates a DataStream from the given non-empty [[Seq]]. The elements need to be serializable + * because the framework may move the elements into the cluster if needed. + * + * Note that this operation will result in a non-parallel data source, i.e. a data source with + * a degree of parallelism of one. + */ + def fromCollection[T: ClassTag: TypeInformation]( + data: Seq[T]): DataStream[T] = { + Validate.notNull(data, "Data must not be null.") + val typeInfo = implicitly[TypeInformation[T]] + val returnStream = new DataStreamSource[T](javaEnv, + "elements", typeInfo); + + javaEnv.getJobGraphBuilder.addStreamVertex(returnStream.getId(), + new SourceInvokable[T](new FromElementsFunction[T](scala.collection.JavaConversions.asJavaCollection(data))), null, typeInfo, + "source", 1); + new DataStream(returnStream) + } + + def execute() = javaEnv.execute() + +} + +object StreamExecutionEnvironment { + + /** + * Creates an execution environment that represents the context in which the program is + * currently executed. If the program is invoked standalone, this method returns a local + * execution environment. If the program is invoked from within the command line client + * to be submitted to a cluster, this method returns the execution environment of this cluster. + */ + def getExecutionEnvironment: StreamExecutionEnvironment = { + new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment) + } + + /** + * Creates a local execution environment. The local execution environment will run the program in + * a multi-threaded fashion in the same JVM as the environment was created in. The default degree + * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads). + */ + def createLocalEnvironment( + degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()): StreamExecutionEnvironment = { + new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism)) + } + + /** + * Creates a remote execution environment. The remote environment sends (parts of) the program to + * a cluster for execution. Note that all file paths used in the program must be accessible from + * the cluster. The execution will use the cluster's default degree of parallelism, unless the + * parallelism is set explicitly via [[ExecutionEnvironment.setDegreeOfParallelism()]]. + * + * @param host The host name or address of the master (JobManager), + * where the program should be executed. + * @param port The port of the master (JobManager), where the program should be executed. + * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the + * program uses + * user-defined functions, user-defined input formats, or any libraries, + * those must be + * provided in the JAR files. + */ + def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment = { + new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)) + } + + /** + * Creates a remote execution environment. The remote environment sends (parts of) the program + * to a cluster for execution. Note that all file paths used in the program must be accessible + * from the cluster. The execution will use the specified degree of parallelism. + * + * @param host The host name or address of the master (JobManager), + * where the program should be executed. + * @param port The port of the master (JobManager), where the program should be executed. + * @param degreeOfParallelism The degree of parallelism to use during the execution. + * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the + * program uses + * user-defined functions, user-defined input formats, or any libraries, + * those must be + * provided in the JAR files. + */ + def createRemoteEnvironment( + host: String, + port: Int, + degreeOfParallelism: Int, + jarFiles: String*): StreamExecutionEnvironment = { + val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*) + javaEnv.setDegreeOfParallelism(degreeOfParallelism) + new StreamExecutionEnvironment(javaEnv) + } +} \ No newline at end of file
