[streaming] StreamInvokable rework for simpler logic and easier use
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/88f38e49 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/88f38e49 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/88f38e49 Branch: refs/heads/release-0.8 Commit: 88f38e49202354926cb4ec36390cbe34bad247a3 Parents: 15fb1da Author: Gyula Fora <[email protected]> Authored: Wed Dec 17 23:34:26 2014 +0100 Committer: mbalassi <[email protected]> Committed: Thu Dec 18 18:52:48 2014 +0100 ---------------------------------------------------------------------- docs/streaming_guide.md | 10 - .../flink/streaming/api/JobGraphBuilder.java | 8 - .../flink/streaming/api/StreamConfig.java | 13 +- .../streaming/api/datastream/DataStream.java | 9 +- .../datastream/SingleOutputStreamOperator.java | 18 -- .../streaming/api/invokable/SinkInvokable.java | 14 +- .../api/invokable/SourceInvokable.java | 22 +- .../api/invokable/StreamInvokable.java | 87 +++----- .../invokable/operator/CounterInvokable.java | 23 +- .../api/invokable/operator/FilterInvokable.java | 24 +- .../invokable/operator/FlatMapInvokable.java | 14 +- .../operator/GroupedReduceInvokable.java | 4 +- .../operator/GroupedWindowInvokable.java | 76 +++---- .../api/invokable/operator/MapInvokable.java | 14 +- .../invokable/operator/ProjectInvokable.java | 11 +- .../operator/StreamReduceInvokable.java | 14 +- .../api/invokable/operator/WindowInvokable.java | 29 +-- .../operator/co/CoBatchReduceInvokable.java | 9 +- .../api/invokable/operator/co/CoInvokable.java | 45 +--- .../operator/co/CoWindowInvokable.java | 5 - .../api/streamvertex/CoStreamVertex.java | 37 +++- .../api/streamvertex/StreamTaskContext.java | 40 ++++ .../api/streamvertex/StreamVertex.java | 48 +++- .../streaming/api/AggregationFunctionTest.java | 36 +-- .../api/collector/DirectedOutputTest.java | 2 +- .../invokable/operator/CoBatchReduceTest.java | 6 +- .../api/invokable/operator/CoFlatMapTest.java | 4 +- .../operator/CoGroupedBatchReduceTest.java | 6 +- .../invokable/operator/CoGroupedReduceTest.java | 6 +- .../operator/CoGroupedWindowReduceTest.java | 6 +- .../api/invokable/operator/CoMapTest.java | 4 +- .../invokable/operator/CoStreamReduceTest.java | 4 +- .../invokable/operator/CoWindowReduceTest.java | 6 +- .../api/invokable/operator/CoWindowTest.java | 6 +- .../operator/CounterInvokableTest.java | 4 +- .../api/invokable/operator/FilterTest.java | 4 +- .../api/invokable/operator/FlatMapTest.java | 4 +- .../operator/GroupedReduceInvokableTest.java | 4 +- .../operator/GroupedWindowInvokableTest.java | 16 +- .../api/invokable/operator/MapTest.java | 4 +- .../api/invokable/operator/ProjectTest.java | 4 +- .../invokable/operator/StreamReduceTest.java | 4 +- .../invokable/operator/WindowInvokableTest.java | 10 +- .../flink/streaming/util/MockCoContext.java | 217 +++++++++++++++++++ .../flink/streaming/util/MockCoInvokable.java | 169 --------------- .../flink/streaming/util/MockContext.java | 148 +++++++++++++ .../flink/streaming/util/MockInvokable.java | 105 --------- 47 files changed, 665 insertions(+), 688 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/docs/streaming_guide.md ---------------------------------------------------------------------- diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md index 6e7f932..c51afbc 100644 --- a/docs/streaming_guide.md +++ b/docs/streaming_guide.md @@ -609,16 +609,6 @@ env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); To maximise the throughput the user can call `.setBufferTimeout(-1)` which will remove the timeout and buffers will only be flushed when they are full. To minimise latency, set the timeout to a value close to 0 (fro example 5 or 10 ms). Theoretically a buffer timeout of 0 will cause all outputs to be flushed when produced, but this setting should be avoided because it can cause severe performance degradation. -### Mutability - -This is currently a beta feature and it is only supported for a subset of the available operators. - -Most operators allow setting mutability for reading input data. If the operator is set mutable then the variable used to store input data for operators will be reused in a mutable fashion to avoid excessive object creation. By default, all operators are set to immutable. -Usage: - -~~~java -operator.setMutability(isMutable) -~~~ [Back to top](#top) http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java index d63042a..d66e388 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java @@ -64,7 +64,6 @@ public class JobGraphBuilder { private Map<String, List<Integer>> outEdgeType; private Map<String, List<List<String>>> outEdgeNames; private Map<String, List<Boolean>> outEdgeSelectAll; - private Map<String, Boolean> mutability; private Map<String, List<String>> inEdgeList; private Map<String, List<StreamPartitioner<?>>> connectionTypes; private Map<String, String> operatorNames; @@ -97,7 +96,6 @@ public class JobGraphBuilder { outEdgeType = new HashMap<String, List<Integer>>(); outEdgeNames = new HashMap<String, List<List<String>>>(); outEdgeSelectAll = new HashMap<String, List<Boolean>>(); - mutability = new HashMap<String, Boolean>(); inEdgeList = new HashMap<String, List<String>>(); connectionTypes = new HashMap<String, List<StreamPartitioner<?>>>(); operatorNames = new HashMap<String, String>(); @@ -302,7 +300,6 @@ public class JobGraphBuilder { vertexClasses.put(vertexName, vertexClass); setParallelism(vertexName, parallelism); - mutability.put(vertexName, false); invokableObjects.put(vertexName, invokableObject); operatorNames.put(vertexName, operatorName); serializedFunctions.put(vertexName, serializedFunction); @@ -355,7 +352,6 @@ public class JobGraphBuilder { StreamConfig config = new StreamConfig(vertex.getConfiguration()); - config.setMutability(mutability.get(vertexName)); config.setBufferTimeout(bufferTimeout.get(vertexName)); config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName)); @@ -447,10 +443,6 @@ public class JobGraphBuilder { inputFormatList.put(vertexName, inputFormat); } - public void setMutability(String vertexName, boolean isMutable) { - mutability.put(vertexName, isMutable); - } - public void setBufferTimeout(String vertexName, long bufferTimeout) { this.bufferTimeout.put(vertexName, bufferTimeout); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index 1d863a7..9800b63 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -57,14 +57,11 @@ public class StreamConfig { private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2"; private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1"; private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2"; - private static final String MUTABILITY = "isMutable"; private static final String ITERATON_WAIT = "iterationWait"; // DEFAULT VALUES - private static final boolean DEFAULT_IS_MUTABLE = false; - - private static final long DEFAULT_TIMEOUT = 0; + private static final long DEFAULT_TIMEOUT = 100; // CONFIG METHODS @@ -138,14 +135,6 @@ public class StreamConfig { config.setBytes(key, SerializationUtils.serialize(typeWrapper)); } - public void setMutability(boolean isMutable) { - config.setBoolean(MUTABILITY, isMutable); - } - - public boolean getMutability() { - return config.getBoolean(MUTABILITY, DEFAULT_IS_MUTABLE); - } - public void setBufferTimeout(long timeout) { config.setLong(BUFFER_TIMEOUT, timeout); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 474d57b..6e8da0a 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -366,7 +366,7 @@ public class DataStream<OUT> { * the data stream that will be fed back and used as the input for the * iteration head. A common usage pattern for streaming iterations is to use * output splitting to send a part of the closing data stream to the head. - * Refer to {@link SingleOutputStreamOperator#split(OutputSelector)} for + * Refer to {@link SingleOutputStreamOperator#split(outputSelector)} for * more information. * <p> * The iteration edge will be partitioned the same way as the first input of @@ -940,7 +940,6 @@ public class DataStream<OUT> { WriteFormatAsText<OUT> format, long millis, OUT endTuple) { DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>( path, format, millis, endTuple), inputStream.typeInfo); - jobGraphBuilder.setMutability(returnStream.getId(), false); return returnStream; } @@ -968,7 +967,6 @@ public class DataStream<OUT> { DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), inputStream.typeInfo); - jobGraphBuilder.setMutability(returnStream.getId(), false); return returnStream; } @@ -1063,9 +1061,6 @@ public class DataStream<OUT> { * @return The closed DataStream */ public DataStreamSink<OUT> writeAsCsv(String path, int batchSize, OUT endTuple) { - if (this instanceof SingleOutputStreamOperator) { - ((SingleOutputStreamOperator<?, ?>) this).setMutability(false); - } return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize, endTuple); } @@ -1091,7 +1086,6 @@ public class DataStream<OUT> { WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) { DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>( path, format, millis, endTuple), inputStream.typeInfo); - jobGraphBuilder.setMutability(returnStream.getId(), false); return returnStream; } @@ -1119,7 +1113,6 @@ public class DataStream<OUT> { DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), inputStream.typeInfo); - jobGraphBuilder.setMutability(returnStream.getId(), false); return returnStream; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 3e1c940..016322b 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -71,24 +71,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato return this; } - /** - * This is a beta feature, use with care </br><br/> - * Sets the mutability of the operator. If the operator is set to mutable, - * the tuples received in the user defined functions, will be reused after - * the function call. Setting an operator to mutable reduces garbage - * collection overhead and thus increases scalability. Please note that if a - * {@link DataStream#batchReduce} or {@link DataStream#windowReduce} is used - * as mutable, the user can only iterate through the iterator once in every - * invoke. - * - * @param isMutable - * The mutability of the operator. - * @return The operator with mutability set. - */ - public SingleOutputStreamOperator<OUT, O> setMutability(boolean isMutable) { - jobGraphBuilder.setMutability(id, isMutable); - return this; - } /** * Sets the maximum time frequency (ms) for the flushing of the output http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java index ec33224..74591a8 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java @@ -30,23 +30,15 @@ public class SinkInvokable<IN> extends StreamInvokable<IN, IN> { } @Override - protected void immutableInvoke() throws Exception { - while ((reuse = recordIterator.next(reuse)) != null) { - callUserFunctionAndLogException(); - resetReuse(); - } - } - - @Override - protected void mutableInvoke() throws Exception { - while ((reuse = recordIterator.next(reuse)) != null) { + public void invoke() throws Exception { + while (readNext() != null) { callUserFunctionAndLogException(); } } @Override protected void callUserFunction() throws Exception { - sinkFunction.invoke((IN) reuse.getObject()); + sinkFunction.invoke((IN) nextRecord.getObject()); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java index 0cfe028..f1cf2c5 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java @@ -21,38 +21,24 @@ import java.io.Serializable; import org.apache.flink.streaming.api.function.source.SourceFunction; -public class SourceInvokable<OUT> extends StreamInvokable<OUT,OUT> implements Serializable { +public class SourceInvokable<OUT> extends StreamInvokable<OUT, OUT> implements Serializable { private static final long serialVersionUID = 1L; private SourceFunction<OUT> sourceFunction; - public SourceInvokable(SourceFunction<OUT> sourceFunction) { super(sourceFunction); this.sourceFunction = sourceFunction; } @Override - public void invoke() throws Exception { - sourceFunction.invoke(collector); - } - - @Override - protected void immutableInvoke() throws Exception { - } - - @Override - protected void mutableInvoke() throws Exception { + public void invoke() { + callUserFunctionAndLogException(); } @Override protected void callUserFunction() throws Exception { + sourceFunction.invoke(collector); } - - @Override - public SourceFunction<OUT> getSourceFunction(){ - return sourceFunction; - } - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java index e587b93..d19d7ad 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java @@ -17,16 +17,16 @@ package org.apache.flink.streaming.api.invokable; +import java.io.IOException; import java.io.Serializable; import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.function.source.SourceFunction; import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; +import org.apache.flink.streaming.api.streamvertex.StreamTaskContext; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; import org.apache.flink.util.StringUtils; @@ -45,9 +45,11 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(StreamInvokable.class); + protected StreamTaskContext<OUT> taskContext; + protected MutableObjectIterator<StreamRecord<IN>> recordIterator; protected StreamRecordSerializer<IN> inSerializer; - protected StreamRecord<IN> reuse; + protected StreamRecord<IN> nextRecord; protected boolean isMutable; protected Collector<OUT> collector; @@ -61,48 +63,43 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { /** * Initializes the {@link StreamInvokable} for input and output handling * - * @param collector - * Collector object for collecting the outputs for the operator - * @param recordIterator - * Iterator for reading in the input records - * @param serializer - * Serializer used to deserialize inputs - * @param isMutable - * Mutability setting for the operator + * @param taskContext + * StreamTaskContext representing the vertex */ - public void initialize(Collector<OUT> collector, - MutableObjectIterator<StreamRecord<IN>> recordIterator, - StreamRecordSerializer<IN> serializer, boolean isMutable) { - this.collector = collector; - this.recordIterator = recordIterator; - this.inSerializer = serializer; + public void setup(StreamTaskContext<OUT> taskContext) { + this.collector = taskContext.getOutputCollector(); + this.recordIterator = taskContext.getInput(0); + this.inSerializer = taskContext.getInputSerializer(0); if (this.inSerializer != null) { - this.reuse = serializer.createInstance(); + this.nextRecord = inSerializer.createInstance(); } - this.isMutable = isMutable; - } - - /** - * Re-initializes the object in which the next input record will be read in - */ - protected void resetReuse() { - this.reuse = inSerializer.createInstance(); + this.taskContext = taskContext; } /** - * Method that will be called if the mutability setting is set to immutable + * Method that will be called when the operator starts, should encode the + * processing logic */ - protected abstract void immutableInvoke() throws Exception; + public abstract void invoke() throws Exception; - /** - * Method that will be called if the mutability setting is set to mutable + /* + * Reads the next record from the reader iterator and stores it in the + * nextRecord variable */ - protected abstract void mutableInvoke() throws Exception; + protected StreamRecord<IN> readNext() { + this.nextRecord = inSerializer.createInstance(); + try { + return nextRecord = recordIterator.next(nextRecord); + } catch (IOException e) { + throw new RuntimeException("Could not read next record."); + } + } /** * The call of the user implemented function should be implemented here */ - protected abstract void callUserFunction() throws Exception; + protected void callUserFunction() throws Exception { + } /** * Method for logging exceptions thrown during the user function call @@ -119,20 +116,6 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { } /** - * Method that will be called when the stream starts. The user should encode - * the processing functionality in {@link #mutableInvoke()} and - * {@link #immutableInvoke()} - * - */ - public void invoke() throws Exception { - if (this.isMutable) { - mutableInvoke(); - } else { - immutableInvoke(); - } - } - - /** * Open method to be used if the user defined function extends the * RichFunction class * @@ -141,9 +124,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { */ public void open(Configuration parameters) throws Exception { isRunning = true; - if (userFunction instanceof RichFunction) { - ((RichFunction) userFunction).open(parameters); - } + FunctionUtils.openFunction(userFunction, parameters); } /** @@ -154,16 +135,10 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { public void close() throws Exception { isRunning = false; collector.close(); - if (userFunction instanceof RichFunction) { - ((RichFunction) userFunction).close(); - } + FunctionUtils.closeFunction(userFunction); } public void setRuntimeContext(RuntimeContext t) { FunctionUtils.setFunctionRuntimeContext(userFunction, t); } - - public SourceFunction<OUT> getSourceFunction() { - return null; - } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java index 7924595..0267253 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java @@ -21,30 +21,17 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable; public class CounterInvokable<IN> extends StreamInvokable<IN, Long> { private static final long serialVersionUID = 1L; - + Long count = 0L; - + public CounterInvokable() { super(null); } - - @Override - protected void immutableInvoke() throws Exception { - while ((reuse = recordIterator.next(reuse)) != null) { - callUserFunctionAndLogException(); - resetReuse(); - } - } @Override - protected void mutableInvoke() throws Exception { - while ((reuse = recordIterator.next(reuse)) != null) { - callUserFunctionAndLogException(); + public void invoke() throws Exception { + while (readNext() != null) { + collector.collect(++count); } } - - @Override - protected void callUserFunction() throws Exception { - collector.collect(++count); - } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java index a54b6ad..796196d 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java @@ -25,37 +25,25 @@ public class FilterInvokable<IN> extends StreamInvokable<IN, IN> { private static final long serialVersionUID = 1L; FilterFunction<IN> filterFunction; + private boolean collect; public FilterInvokable(FilterFunction<IN> filterFunction) { super(filterFunction); this.filterFunction = filterFunction; } - private boolean canCollect; - @Override - protected void immutableInvoke() throws Exception { - while ((reuse = recordIterator.next(reuse)) != null) { + public void invoke() throws Exception { + while (readNext() != null) { callUserFunctionAndLogException(); - if (canCollect) { - collector.collect(reuse.getObject()); - } - resetReuse(); - } - } - - @Override - protected void mutableInvoke() throws Exception { - while ((reuse = recordIterator.next(reuse)) != null) { - callUserFunctionAndLogException(); - if (canCollect) { - collector.collect(reuse.getObject()); + if (collect) { + collector.collect(nextRecord.getObject()); } } } @Override protected void callUserFunction() throws Exception { - canCollect = filterFunction.filter(reuse.getObject()); + collect = filterFunction.filter(nextRecord.getObject()); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java index 3452a82..8ff78eb 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java @@ -31,23 +31,15 @@ public class FlatMapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { } @Override - protected void immutableInvoke() throws Exception { - while ((reuse = recordIterator.next(reuse)) != null) { - callUserFunctionAndLogException(); - resetReuse(); - } - } - - @Override - protected void mutableInvoke() throws Exception { - while ((reuse = recordIterator.next(reuse)) != null) { + public void invoke() throws Exception { + while (readNext() != null) { callUserFunctionAndLogException(); } } @Override protected void callUserFunction() throws Exception { - flatMapper.flatMap(reuse.getObject(), collector); + flatMapper.flatMap(nextRecord.getObject(), collector); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java index d64fb6f..fdcf520 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java @@ -38,9 +38,9 @@ public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> { @Override protected void reduce() throws Exception { - Object key = reuse.getKey(keySelector); + Object key = nextRecord.getKey(keySelector); currentValue = values.get(key); - nextValue = reuse.getObject(); + nextValue = nextRecord.getObject(); if (currentValue != null) { callUserFunctionAndLogException(); values.put(key, reduced); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java index ae16be0..33348e4 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java @@ -37,8 +37,6 @@ import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This invokable allows windowing based on {@link TriggerPolicy} and @@ -80,8 +78,6 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { */ private static final long serialVersionUID = -3469545957144404137L; - private static final Logger LOG = LoggerFactory.getLogger(GroupedWindowInvokable.class); - private KeySelector<IN, ?> keySelector; private Configuration parameters; private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies; @@ -226,23 +222,23 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { } @Override - protected void immutableInvoke() throws Exception { + public void invoke() throws Exception { // Prevent empty data streams - if ((reuse = recordIterator.next(reuse)) == null) { + if (readNext() == null) { throw new RuntimeException("DataStream must not be empty"); } // Continuously run - while (reuse != null) { - WindowInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector.getKey(reuse - .getObject())); + while (nextRecord != null) { + WindowInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector + .getKey(nextRecord.getObject())); if (groupInvokable == null) { - groupInvokable = makeNewGroup(reuse); + groupInvokable = makeNewGroup(nextRecord); } // Run the precalls for central active triggers for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) { - Object[] result = trigger.preNotifyTrigger(reuse.getObject()); + Object[] result = trigger.preNotifyTrigger(nextRecord.getObject()); for (Object in : result) { // If central eviction is used, handle it here @@ -260,7 +256,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { // Process non-active central triggers for (TriggerPolicy<IN> triggerPolicy : centralTriggerPolicies) { - if (triggerPolicy.notifyTrigger(reuse.getObject())) { + if (triggerPolicy.notifyTrigger(nextRecord.getObject())) { currentTriggerPolicies.add(triggerPolicy); } } @@ -268,12 +264,12 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { if (currentTriggerPolicies.isEmpty()) { // only add the element to its group - groupInvokable.processRealElement(reuse.getObject()); + groupInvokable.processRealElement(nextRecord.getObject()); checkForEmptyGroupBuffer(groupInvokable); // If central eviction is used, handle it here if (!centralEvictionPolicies.isEmpty()) { - evictElements(centralEviction(reuse.getObject(), false)); + evictElements(centralEviction(nextRecord.getObject(), false)); deleteOrderForCentralEviction.add(groupInvokable); } @@ -283,20 +279,21 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { for (WindowInvokable<IN, OUT> group : windowingGroups.values()) { if (group == groupInvokable) { // process real with initialized policies - group.processRealElement(reuse.getObject(), currentTriggerPolicies); + group.processRealElement(nextRecord.getObject(), currentTriggerPolicies); } else { // process like a fake but also initialized with // policies - group.externalTriggerFakeElement(reuse.getObject(), currentTriggerPolicies); + group.externalTriggerFakeElement(nextRecord.getObject(), + currentTriggerPolicies); } - - //remove group in case it has an empty buffer - //checkForEmptyGroupBuffer(group); + + // remove group in case it has an empty buffer + // checkForEmptyGroupBuffer(group); } // If central eviction is used, handle it here if (!centralEvictionPolicies.isEmpty()) { - evictElements(centralEviction(reuse.getObject(), true)); + evictElements(centralEviction(nextRecord.getObject(), true)); deleteOrderForCentralEviction.add(groupInvokable); } } @@ -304,9 +301,8 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { // clear current trigger list currentTriggerPolicies.clear(); - // Recreate the reuse-StremRecord object and load next StreamRecord - resetReuse(); - reuse = recordIterator.next(reuse); + // read next record + readNext(); } // Stop all remaining threads from policies @@ -358,7 +354,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { clonedDistributedEvictionPolicies); } - groupInvokable.initialize(collector, recordIterator, inSerializer, isMutable); + groupInvokable.setup(taskContext); groupInvokable.open(this.parameters); windowingGroups.put(keySelector.getKey(element.getObject()), groupInvokable); @@ -366,21 +362,6 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { } @Override - protected void mutableInvoke() throws Exception { - if (LOG.isInfoEnabled()) { - LOG.info("There is currently no mutable implementation of this operator. Immutable version is used."); - } - immutableInvoke(); - } - - @Override - protected void callUserFunction() throws Exception { - // This method gets never called directly. The user function calls are - // all delegated to the invokable instanced which handle/represent the - // groups. - } - - @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.parameters = parameters; @@ -456,12 +437,13 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { * buffer. */ private void evictElements(int numToEvict) { - HashSet<WindowInvokable<IN, OUT>> usedGroups=new HashSet<WindowInvokable<IN,OUT>>(); + HashSet<WindowInvokable<IN, OUT>> usedGroups = new HashSet<WindowInvokable<IN, OUT>>(); for (; numToEvict > 0; numToEvict--) { - WindowInvokable<IN, OUT> currentGroup=deleteOrderForCentralEviction.getFirst(); - //Do the eviction + WindowInvokable<IN, OUT> currentGroup = deleteOrderForCentralEviction.getFirst(); + // Do the eviction currentGroup.evictFirst(); - //Remember groups which possibly have an empty buffer after the eviction + // Remember groups which possibly have an empty buffer after the + // eviction usedGroups.add(currentGroup); try { deleteOrderForCentralEviction.removeFirst(); @@ -471,13 +453,13 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { } } - - //Remove groups with empty buffer - for (WindowInvokable<IN, OUT> group:usedGroups){ + + // Remove groups with empty buffer + for (WindowInvokable<IN, OUT> group : usedGroups) { checkForEmptyGroupBuffer(group); } } - + /** * Checks if the element buffer of a given windowing group is empty. If so, * the group will be deleted. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java index 4feb4f3..6be96ec 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java @@ -31,22 +31,14 @@ public class MapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { } @Override - protected void immutableInvoke() throws Exception { - while ((reuse = recordIterator.next(reuse)) != null) { - callUserFunctionAndLogException(); - resetReuse(); - } - } - - @Override - protected void mutableInvoke() throws Exception { - while ((reuse = recordIterator.next(reuse)) != null) { + public void invoke() throws Exception { + while (readNext() != null) { callUserFunctionAndLogException(); } } @Override protected void callUserFunction() throws Exception { - collector.collect(mapper.map(reuse.getObject())); + collector.collect(mapper.map(nextRecord.getObject())); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java index 4666a85..c9d9e5a 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java @@ -39,13 +39,8 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN, } @Override - protected void immutableInvoke() throws Exception { - mutableInvoke(); - } - - @Override - protected void mutableInvoke() throws Exception { - while ((reuse = recordIterator.next(reuse)) != null) { + public void invoke() throws Exception { + while (readNext() != null) { callUserFunctionAndLogException(); } } @@ -53,7 +48,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN, @Override protected void callUserFunction() throws Exception { for (int i = 0; i < this.numFields; i++) { - outTuple.setField(reuse.getField(fields[i]), i); + outTuple.setField(nextRecord.getField(fields[i]), i); } collector.collect(outTuple); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java index d327c76..4bb78b8 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java @@ -34,22 +34,14 @@ public class StreamReduceInvokable<IN> extends StreamInvokable<IN, IN> { } @Override - protected void immutableInvoke() throws Exception { - while ((reuse = recordIterator.next(reuse)) != null) { - reduce(); - resetReuse(); - } - } - - @Override - protected void mutableInvoke() throws Exception { - while ((reuse = recordIterator.next(reuse)) != null) { + public void invoke() throws Exception { + while (readNext() != null) { reduce(); } } protected void reduce() throws Exception { - nextValue = reuse.getObject(); + nextValue = nextRecord.getObject(); callUserFunctionAndLogException(); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java index 0e740bb..ea891c9 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java @@ -29,8 +29,6 @@ import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback; import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { @@ -39,8 +37,6 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> */ private static final long serialVersionUID = -8038984294071650730L; - private static final Logger LOG = LoggerFactory.getLogger(WindowInvokable.class); - private LinkedList<TriggerPolicy<IN>> triggerPolicies; private LinkedList<EvictionPolicy<IN>> evictionPolicies; private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies; @@ -120,20 +116,19 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> } @Override - protected void immutableInvoke() throws Exception { + public void invoke() throws Exception { // Prevent empty data streams - if ((reuse = recordIterator.next(reuse)) == null) { + if (readNext() == null) { throw new RuntimeException("DataStream must not be empty"); } // Continuously run - while (reuse != null) { - processRealElement(reuse.getObject()); + while (nextRecord != null) { + processRealElement(nextRecord.getObject()); - // Recreate the reuse-StremRecord object and load next StreamRecord - resetReuse(); - reuse = recordIterator.next(reuse); + // Load next StreamRecord + readNext(); } // Stop all remaining threads from policies @@ -146,14 +141,6 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> } - @Override - protected void mutableInvoke() throws Exception { - if (LOG.isInfoEnabled()) { - LOG.info("There is currently no mutable implementation of this operator. Immutable version is used."); - } - immutableInvoke(); - } - /** * This method gets called in case of an grouped windowing in case central * trigger occurred and the arriving element causing the trigger is not part @@ -363,10 +350,10 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> * * @return true in case the buffer is empty otherwise false. */ - protected boolean isBufferEmpty(){ + protected boolean isBufferEmpty() { return buffer.isEmpty(); } - + /** * This method does the final reduce at the end of the stream and emits the * result. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java index edf5a8f..4ed49fd 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java @@ -63,7 +63,7 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, } @Override - public void immutableInvoke() throws Exception { + public void invoke() throws Exception { while (true) { int next = recordIterator.next(reuse1, reuse2); if (next == 0) { @@ -100,13 +100,6 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, return batch2; } - @Override - // TODO: implement mutableInvoke for reduce - protected void mutableInvoke() throws Exception { - System.out.println("Immutable setting is used"); - immutableInvoke(); - } - protected void reduce1(StreamBatch<IN1> batch) { this.currentBatch1 = batch; callUserFunctionAndLogException1(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java index 25ed62c..604873e 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; +import org.apache.flink.streaming.api.streamvertex.StreamTaskContext; import org.apache.flink.streaming.io.CoReaderIterator; -import org.apache.flink.util.Collector; import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,19 +45,18 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU protected TypeSerializer<IN1> serializer1; protected TypeSerializer<IN2> serializer2; - public void initialize(Collector<OUT> collector, - CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator, - StreamRecordSerializer<IN1> serializer1, StreamRecordSerializer<IN2> serializer2, - boolean isMutable) { - this.collector = collector; + @Override + public void setup(StreamTaskContext<OUT> taskContext) { + this.collector = taskContext.getOutputCollector(); + + this.recordIterator = taskContext.getCoReader(); + + this.srSerializer1 = taskContext.getInputSerializer(0); + this.srSerializer2 = taskContext.getInputSerializer(1); - this.recordIterator = recordIterator; - this.reuse1 = serializer1.createInstance(); - this.reuse2 = serializer2.createInstance(); + this.reuse1 = srSerializer1.createInstance(); + this.reuse2 = srSerializer2.createInstance(); - this.srSerializer1 = serializer1; - this.srSerializer2 = serializer2; - this.isMutable = isMutable; this.serializer1 = srSerializer1.getObjectSerializer(); this.serializer2 = srSerializer2.getObjectSerializer(); } @@ -76,7 +75,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU } @Override - protected void immutableInvoke() throws Exception { + public void invoke() throws Exception { while (true) { int next = recordIterator.next(reuse1, reuse2); if (next == 0) { @@ -93,22 +92,6 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU } } - @Override - protected void mutableInvoke() throws Exception { - while (true) { - int next = recordIterator.next(reuse1, reuse2); - if (next == 0) { - break; - } else if (next == 1) { - initialize1(); - handleStream1(); - } else { - initialize2(); - handleStream2(); - } - } - } - protected abstract void handleStream1() throws Exception; protected abstract void handleStream2() throws Exception; @@ -147,8 +130,4 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU } } - @Override - protected void callUserFunction() throws Exception { - } - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java index be3f578..7df5668 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java @@ -59,11 +59,6 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> } @Override - protected void mutableInvoke() throws Exception { - throw new RuntimeException("Reducing mutable sliding batch is not supported."); - } - - @Override protected void handleStream1() throws Exception { window.addToBuffer1(reuse1.getObject()); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java index 0058c66..9321bc7 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java @@ -30,8 +30,6 @@ import org.apache.flink.util.MutableObjectIterator; public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> { - private OutputHandler<OUT> outputHandler; - protected StreamRecordSerializer<IN1> inputDeserializer1 = null; protected StreamRecordSerializer<IN2> inputDeserializer2 = null; @@ -68,8 +66,7 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> { @Override protected void setInvokable() { userInvokable = configuration.getUserInvokable(userClassLoader); - userInvokable.initialize(outputHandler.getCollector(), coIter, inputDeserializer1, - inputDeserializer2, isMutable); + userInvokable.setup(this); } protected void setConfigInputs() throws StreamVertexException { @@ -105,4 +102,36 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> { outputHandler.invokeUserFunction("CO-TASK", userInvokable); } + @SuppressWarnings("unchecked") + @Override + public <X> MutableObjectIterator<X> getInput(int index) { + switch (index) { + case 0: + return (MutableObjectIterator<X>) inputIter1; + case 1: + return (MutableObjectIterator<X>) inputIter2; + default: + throw new IllegalArgumentException("CoStreamVertex has only 2 inputs"); + } + } + + @SuppressWarnings("unchecked") + @Override + public <X> StreamRecordSerializer<X> getInputSerializer(int index) { + switch (index) { + case 0: + return (StreamRecordSerializer<X>) inputDeserializer1; + case 1: + return (StreamRecordSerializer<X>) inputDeserializer2; + default: + throw new IllegalArgumentException("CoStreamVertex has only 2 inputs"); + } + } + + @SuppressWarnings("unchecked") + @Override + public <X, Y> CoReaderIterator<X, Y> getCoReader() { + return (CoReaderIterator<X, Y>) coIter; + } + } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java new file mode 100644 index 0000000..7fbab3b --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.streamvertex; + +import org.apache.flink.streaming.api.StreamConfig; +import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; +import org.apache.flink.streaming.io.CoReaderIterator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +public interface StreamTaskContext<OUT> { + + StreamConfig getConfig(); + + ClassLoader getUserCodeClassLoader(); + + <X> MutableObjectIterator<X> getInput(int index); + + <X> StreamRecordSerializer<X> getInputSerializer(int index); + + Collector<OUT> getOutputCollector(); + + <X, Y> CoReaderIterator<X, Y> getCoReader(); +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java index 13e6c9f..7504efd 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java @@ -23,9 +23,13 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.streaming.api.StreamConfig; import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; +import org.apache.flink.streaming.io.CoReaderIterator; import org.apache.flink.streaming.state.OperatorState; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; -public class StreamVertex<IN, OUT> extends AbstractInvokable { +public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT> { private static int numTasks; @@ -33,12 +37,11 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable { protected int instanceID; protected String name; private static int numVertices = 0; - protected boolean isMutable; protected Object function; protected String functionName; private InputHandler<IN> inputHandler; - private OutputHandler<OUT> outputHandler; + protected OutputHandler<OUT> outputHandler; private StreamInvokable<IN, OUT> userInvokable; private StreamingRuntimeContext context; @@ -68,7 +71,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable { this.userClassLoader = getUserCodeClassLoader(); this.configuration = new StreamConfig(getTaskConfiguration()); this.name = configuration.getVertexName(); - this.isMutable = configuration.getMutability(); this.functionName = configuration.getFunctionName(); this.function = configuration.getFunction(userClassLoader); this.states = configuration.getOperatorStates(userClassLoader); @@ -89,8 +91,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable { protected void setInvokable() { userInvokable = configuration.getUserInvokable(userClassLoader); - userInvokable.initialize(outputHandler.getCollector(), inputHandler.getInputIter(), - inputHandler.getInputSerializer(), isMutable); + userInvokable.setup(this); } public String getName() { @@ -111,4 +112,39 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable { public void invoke() throws Exception { outputHandler.invokeUserFunction("TASK", userInvokable); } + + @Override + public StreamConfig getConfig() { + return configuration; + } + + @SuppressWarnings("unchecked") + @Override + public <X> MutableObjectIterator<X> getInput(int index) { + if (index == 0) { + return (MutableObjectIterator<X>) inputHandler.getInputIter(); + } else { + throw new IllegalArgumentException("There is only 1 input"); + } + } + + @SuppressWarnings("unchecked") + @Override + public <X> StreamRecordSerializer<X> getInputSerializer(int index) { + if (index == 0) { + return (StreamRecordSerializer<X>) inputHandler.getInputSerializer(); + } else { + throw new IllegalArgumentException("There is only 1 input"); + } + } + + @Override + public Collector<OUT> getOutputCollector() { + return outputHandler.getCollector(); + } + + @Override + public <X, Y> CoReaderIterator<X, Y> getCoReader() { + throw new IllegalArgumentException("CoReader not available"); + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java index 0fbf72a..9376166 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java @@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.function.aggregation.SumAggregator; import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable; -import org.apache.flink.streaming.util.MockInvokable; +import org.apache.flink.streaming.util.MockContext; import org.apache.flink.streaming.util.keys.TupleKeySelector; import org.junit.Test; @@ -102,24 +102,24 @@ public class AggregationFunctionTest { .getAggregator(1, type1, AggregationType.MAX); ReduceFunction<Integer> maxFunction0 = ComparableAggregator.getAggregator(0, type2, AggregationType.MAX); - List<Tuple2<Integer, Integer>> sumList = MockInvokable.createAndExecute( + List<Tuple2<Integer, Integer>> sumList = MockContext.createAndExecute( new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), getInputList()); - List<Tuple2<Integer, Integer>> minList = MockInvokable.createAndExecute( + List<Tuple2<Integer, Integer>> minList = MockContext.createAndExecute( new StreamReduceInvokable<Tuple2<Integer, Integer>>(minFunction), getInputList()); - List<Tuple2<Integer, Integer>> maxList = MockInvokable.createAndExecute( + List<Tuple2<Integer, Integer>> maxList = MockContext.createAndExecute( new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), getInputList()); - List<Tuple2<Integer, Integer>> groupedSumList = MockInvokable.createAndExecute( + List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute( new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, new TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList()); - List<Tuple2<Integer, Integer>> groupedMinList = MockInvokable.createAndExecute( + List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute( new GroupedReduceInvokable<Tuple2<Integer, Integer>>(minFunction, new TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList()); - List<Tuple2<Integer, Integer>> groupedMaxList = MockInvokable.createAndExecute( + List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute( new GroupedReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, new TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList()); @@ -129,11 +129,11 @@ public class AggregationFunctionTest { assertEquals(expectedGroupSumList, groupedSumList); assertEquals(expectedGroupMinList, groupedMinList); assertEquals(expectedGroupMaxList, groupedMaxList); - assertEquals(expectedSumList0, MockInvokable.createAndExecute( + assertEquals(expectedSumList0, MockContext.createAndExecute( new StreamReduceInvokable<Integer>(sumFunction0), simpleInput)); - assertEquals(expectedMinList0, MockInvokable.createAndExecute( + assertEquals(expectedMinList0, MockContext.createAndExecute( new StreamReduceInvokable<Integer>(minFunction0), simpleInput)); - assertEquals(expectedMaxList0, MockInvokable.createAndExecute( + assertEquals(expectedMaxList0, MockContext.createAndExecute( new StreamReduceInvokable<Integer>(maxFunction0), simpleInput)); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); @@ -210,16 +210,16 @@ public class AggregationFunctionTest { minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6)); minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6)); - assertEquals(maxByFirstExpected, MockInvokable.createAndExecute( + assertEquals(maxByFirstExpected, MockContext.createAndExecute( new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst), getInputList())); - assertEquals(maxByLastExpected, MockInvokable.createAndExecute( + assertEquals(maxByLastExpected, MockContext.createAndExecute( new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast), getInputList())); - assertEquals(minByLastExpected, MockInvokable.createAndExecute( + assertEquals(minByLastExpected, MockContext.createAndExecute( new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast), getInputList())); - assertEquals(minByFirstExpected, MockInvokable.createAndExecute( + assertEquals(minByFirstExpected, MockContext.createAndExecute( new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst), getInputList())); @@ -284,16 +284,16 @@ public class AggregationFunctionTest { minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6)); minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6)); - assertEquals(maxByFirstExpected, MockInvokable.createAndExecute( + assertEquals(maxByFirstExpected, MockContext.createAndExecute( new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst), getInputList())); - assertEquals(maxByLastExpected, MockInvokable.createAndExecute( + assertEquals(maxByLastExpected, MockContext.createAndExecute( new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast), getInputList())); - assertEquals(minByLastExpected, MockInvokable.createAndExecute( + assertEquals(minByLastExpected, MockContext.createAndExecute( new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast), getInputList())); - assertEquals(minByFirstExpected, MockInvokable.createAndExecute( + assertEquals(minByFirstExpected, MockContext.createAndExecute( new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst), getInputList())); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java index 4ab1be2..f46bfc5 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java @@ -113,7 +113,7 @@ public class DirectedOutputTest { source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD)); source.selectAll().addSink(new ListSink(ALL)); - env.executeTest(128); + env.executeTest(32); assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN)); assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), outputs.get(ODD_AND_TEN)); assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java index 44b0513..1db286c 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java @@ -25,7 +25,7 @@ import java.util.List; import org.apache.flink.streaming.api.function.co.CoReduceFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable; -import org.apache.flink.streaming.util.MockCoInvokable; +import org.apache.flink.streaming.util.MockCoContext; import org.junit.Test; public class CoBatchReduceTest { @@ -84,7 +84,7 @@ public class CoBatchReduceTest { expected.add("def"); expected.add("ghi"); - List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2); + List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2); Collections.sort(result); Collections.sort(expected); @@ -125,7 +125,7 @@ public class CoBatchReduceTest { expected.add("efg"); expected.add("ghi"); - List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2); + List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2); Collections.sort(result); Collections.sort(expected); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java index 5009496..a91bd0c 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java @@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.co.CoFlatMapFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable; -import org.apache.flink.streaming.util.MockCoInvokable; +import org.apache.flink.streaming.util.MockCoContext; import org.apache.flink.util.Collector; import org.junit.Test; @@ -59,7 +59,7 @@ public class CoFlatMapTest implements Serializable { List<String> expectedList = Arrays.asList("a", "b", "c", "1", "d", "e", "f", "2", "g", "h", "e", "3", "4", "5"); - List<String> actualList = MockCoInvokable.createAndExecute(invokable, + List<String> actualList = MockCoContext.createAndExecute(invokable, Arrays.asList("abc", "def", "ghe"), Arrays.asList(1, 2, 3, 4, 5)); assertEquals(expectedList, actualList); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java index ce01a7d..bc19a89 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java @@ -26,7 +26,7 @@ import java.util.List; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.function.co.CoReduceFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable; -import org.apache.flink.streaming.util.MockCoInvokable; +import org.apache.flink.streaming.util.MockCoContext; import org.apache.flink.streaming.util.keys.TupleKeySelector; import org.junit.Test; @@ -99,7 +99,7 @@ public class CoGroupedBatchReduceTest { new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new TupleKeySelector(0), new TupleKeySelector(0)); - List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2); + List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2); Collections.sort(result); Collections.sort(expected); @@ -146,7 +146,7 @@ public class CoGroupedBatchReduceTest { new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new TupleKeySelector(0), new TupleKeySelector(0)); - List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2); + List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2); Collections.sort(result); Collections.sort(expected); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java index 4570e23..15d42a4 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java @@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.function.co.CoReduceFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable; -import org.apache.flink.streaming.util.MockCoInvokable; +import org.apache.flink.streaming.util.MockCoContext; import org.apache.flink.streaming.util.keys.TupleKeySelector; import org.junit.Test; @@ -77,7 +77,7 @@ public class CoGroupedReduceTest { List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5", "7"); - List<String> actualList = MockCoInvokable.createAndExecute(invokable, + List<String> actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5)); assertEquals(expected, actualList); @@ -87,7 +87,7 @@ public class CoGroupedReduceTest { expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7"); - actualList = MockCoInvokable.createAndExecute(invokable, + actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5)); assertEquals(expected, actualList); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java index f36a7b5..b5a7e8d 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java @@ -29,7 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.function.co.CoReduceFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable; import org.apache.flink.streaming.api.invokable.util.TimeStamp; -import org.apache.flink.streaming.util.MockCoInvokable; +import org.apache.flink.streaming.util.MockCoContext; import org.apache.flink.streaming.util.keys.TupleKeySelector; import org.junit.Test; @@ -129,7 +129,7 @@ public class CoGroupedWindowReduceTest { new TupleKeySelector( 0), new MyTimeStamp<Tuple2<String, Integer>>( timestamps1), new MyTimeStamp<Tuple2<String, String>>(timestamps2)); - List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2); + List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2); Collections.sort(result); Collections.sort(expected); @@ -182,7 +182,7 @@ public class CoGroupedWindowReduceTest { new TupleKeySelector( 0), new MyTimeStamp<Tuple2<String, Integer>>( timestamps1), new MyTimeStamp<Tuple2<String, String>>(timestamps2)); - List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2); + List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2); Collections.sort(result); Collections.sort(expected); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java index 9c62aec..93d1741 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java @@ -25,7 +25,7 @@ import java.util.List; import org.apache.flink.streaming.api.function.co.CoMapFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable; -import org.apache.flink.streaming.util.MockCoInvokable; +import org.apache.flink.streaming.util.MockCoContext; import org.junit.Test; public class CoMapTest implements Serializable { @@ -50,7 +50,7 @@ public class CoMapTest implements Serializable { CoMapInvokable<Double, Integer, String> invokable = new CoMapInvokable<Double, Integer, String>(new MyCoMap()); List<String> expectedList = Arrays.asList("1.1", "1", "1.2", "2", "1.3", "3", "1.4", "1.5"); - List<String> actualList = MockCoInvokable.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3)); + List<String> actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3)); assertEquals(expectedList, actualList); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java index 996320a..3343ba0 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java @@ -24,7 +24,7 @@ import java.util.List; import org.apache.flink.streaming.api.function.co.CoReduceFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable; -import org.apache.flink.streaming.util.MockCoInvokable; +import org.apache.flink.streaming.util.MockCoContext; import org.junit.Test; public class CoStreamReduceTest { @@ -62,7 +62,7 @@ public class CoStreamReduceTest { new MyCoReduceFunction()); List<Integer> expected1 = Arrays.asList(1, 9, 2, 99, 6, 998, 24); - List<Integer> result = MockCoInvokable.createAndExecute(coReduce, + List<Integer> result = MockCoContext.createAndExecute(coReduce, Arrays.asList(1, 2, 3, 4), Arrays.asList("9", "9", "8")); assertEquals(expected1, result); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java index 4604b27..90ad483 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java @@ -28,7 +28,7 @@ import java.util.List; import org.apache.flink.streaming.api.function.co.CoReduceFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable; import org.apache.flink.streaming.api.invokable.util.TimeStamp; -import org.apache.flink.streaming.util.MockCoInvokable; +import org.apache.flink.streaming.util.MockCoContext; import org.junit.Test; public class CoWindowReduceTest { @@ -114,7 +114,7 @@ public class CoWindowReduceTest { expected.add("abcde"); expected.add("fghi"); - List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2); + List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2); Collections.sort(result); Collections.sort(expected); @@ -160,7 +160,7 @@ public class CoWindowReduceTest { expected.add("fgh"); expected.add("hi"); - List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2); + List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2); Collections.sort(result); Collections.sort(expected); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java index ebdd963..c6d446a 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java @@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.function.co.CoWindowFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; import org.apache.flink.streaming.api.invokable.util.TimeStamp; -import org.apache.flink.streaming.util.MockCoInvokable; +import org.apache.flink.streaming.util.MockCoContext; import org.apache.flink.util.Collector; import org.junit.Test; @@ -148,7 +148,7 @@ public class CoWindowTest { expected1.add(0); expected1.add(1); - List<Integer> actual1 = MockCoInvokable.createAndExecute(invokable1, input11, input12); + List<Integer> actual1 = MockCoContext.createAndExecute(invokable1, input11, input12); assertEquals(expected1, actual1); CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>( @@ -183,7 +183,7 @@ public class CoWindowTest { expected2.add(8); expected2.add(7); - List<Integer> actual2 = MockCoInvokable.createAndExecute(invokable2, input21, input22); + List<Integer> actual2 = MockCoContext.createAndExecute(invokable2, input21, input22); assertEquals(expected2, actual2); } }
