Repository: flink Updated Branches: refs/heads/master c94fdcdd5 -> 8754352ff
[FLINK-2631] [streaming] Fixes the StreamFold operator and adds OutputTypeConfigurable interface to support type injection at StreamGraph creation. Adds test for non serializable fold type. Adds test to verify proper output type forwarding for OutputTypeConfigurable implementations. Makes OutputTypeConfigurable typed, tests that TwoInputStreamOperator is output type configurable This closes #1101 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c2791b0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c2791b0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c2791b0 Branch: refs/heads/master Commit: 9c2791b0a1b8bd3b0fb189220749d1c82f7a0d09 Parents: c94fdcd Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Sep 7 11:34:48 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Sep 10 12:28:47 2015 +0200 ---------------------------------------------------------------------- .../api/datastream/GroupedDataStream.java | 2 +- .../flink/streaming/api/graph/StreamGraph.java | 35 ++- .../api/graph/StreamGraphGenerator.java | 2 +- .../api/operators/OutputTypeConfigurable.java | 42 ++++ .../streaming/api/operators/StreamFold.java | 51 +++- .../api/operators/StreamGroupedFold.java | 28 ++- .../api/operators/StreamGroupedReduce.java | 8 +- .../streaming/api/operators/StreamReduce.java | 3 +- .../streaming/api/StreamingOperatorsITCase.java | 230 +++++++++++++++++++ .../api/graph/StreamGraphGeneratorTest.java | 129 ++++++++++- .../api/operators/StreamGroupedFoldTest.java | 10 +- ...ScalaStreamingMultipleProgramsTestBase.scala | 55 +++++ .../api/scala/StreamingOperatorsITCase.scala | 116 ++++++++++ 13 files changed, 679 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java index 72ef945..a1106bc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java @@ -88,7 +88,7 @@ public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> { Utils.getCallLocationName(), true); return transform("Grouped Fold", outType, new StreamGroupedFold<OUT, R>(clean(folder), - keySelector, initialValue, outType)); + keySelector, initialValue)); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 6474ae9..cda5686 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.StateHandleProvider; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; @@ -190,8 +191,12 @@ public class StreamGraph extends StreamingPlan { sinks.add(vertexID); } - public <IN, OUT> void addOperator(Integer vertexID, StreamOperator<OUT> operatorObject, - TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) { + public <IN, OUT> void addOperator( + Integer vertexID, + StreamOperator<OUT> operatorObject, + TypeInformation<IN> inTypeInfo, + TypeInformation<OUT> outTypeInfo, + String operatorName) { if (operatorObject instanceof StreamSource) { addNode(vertexID, SourceStreamTask.class, operatorObject, operatorName); @@ -205,22 +210,40 @@ public class StreamGraph extends StreamingPlan { setSerializers(vertexID, inSerializer, null, outSerializer); + if (operatorObject instanceof OutputTypeConfigurable) { + @SuppressWarnings("unchecked") + OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) operatorObject; + // sets the output type which must be know at StreamGraph creation time + outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig); + } + if (LOG.isDebugEnabled()) { LOG.debug("Vertex: {}", vertexID); } } - public <IN1, IN2, OUT> void addCoOperator(Integer vertexID, - TwoInputStreamOperator<IN1, IN2, OUT> taskoperatorObject, TypeInformation<IN1> in1TypeInfo, - TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) { + public <IN1, IN2, OUT> void addCoOperator( + Integer vertexID, + TwoInputStreamOperator<IN1, IN2, OUT> taskOperatorObject, + TypeInformation<IN1> in1TypeInfo, + TypeInformation<IN2> in2TypeInfo, + TypeInformation<OUT> outTypeInfo, + String operatorName) { - addNode(vertexID, TwoInputStreamTask.class, taskoperatorObject, operatorName); + addNode(vertexID, TwoInputStreamTask.class, taskOperatorObject, operatorName); TypeSerializer<OUT> outSerializer = (outTypeInfo != null) && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null; setSerializers(vertexID, in1TypeInfo.createSerializer(executionConfig), in2TypeInfo.createSerializer(executionConfig), outSerializer); + if (taskOperatorObject instanceof OutputTypeConfigurable) { + @SuppressWarnings("unchecked") + OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) taskOperatorObject; + // sets the output type which must be know at StreamGraph creation time + outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig); + } + if (LOG.isDebugEnabled()) { LOG.debug("CO-TASK: {}", vertexID); } http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 6df8cb5..774c00b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -47,7 +47,7 @@ import java.util.Map; * * <p> * This traverses the tree of {@code StreamTransformations} starting from the sinks. At each - * we transformation recursively transform the inputs, then create a node in the {@code StreamGraph} + * transformation we recursively transform the inputs, then create a node in the {@code StreamGraph} * and add edges from the input Nodes to our newly created node. The transformation methods * return the IDs of the nodes in the StreamGraph that represent the input transformation. Several * IDs can be returned to be able to deal with feedback transformations and unions. http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java new file mode 100644 index 0000000..1d05966 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +/** + * Stream operators can implement this interface if they need access to the output type information + * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. This can be useful for + * cases where the output type is specified by the returns method and, thus, after the stream + * operator has been created. + */ +public interface OutputTypeConfigurable<OUT> { + + /** + * Is called by the {@link org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, StreamOperator, TypeInformation, TypeInformation, String)} + * method when the {@link org.apache.flink.streaming.api.graph.StreamGraph} is generated. The + * method is called with the output {@link TypeInformation} which is also used for the + * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} output serializer. + * + * @param outTypeInfo Output type information of the {@link org.apache.flink.streaming.runtime.tasks.StreamTask} + * @param executionConfig Execution configuration + */ + void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig); +} http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java index a5e5264..81115f0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java @@ -17,27 +17,36 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + public class StreamFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>> - implements OneInputStreamOperator<IN, OUT> { + implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> { private static final long serialVersionUID = 1L; - private OUT accumulator; + protected transient OUT accumulator; + private byte[] serializedInitialValue; + protected TypeSerializer<OUT> outTypeSerializer; - protected TypeInformation<OUT> outTypeInformation; - public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue, TypeInformation<OUT> outTypeInformation) { + public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue) { super(folder); this.accumulator = initialValue; - this.outTypeInformation = outTypeInformation; this.chainingStrategy = ChainingStrategy.FORCE_ALWAYS; } @@ -50,11 +59,41 @@ public class StreamFold<IN, OUT> @Override public void open(Configuration config) throws Exception { super.open(config); - this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig); + + if (serializedInitialValue == null) { + throw new RuntimeException("No initial value was serialized for the fold " + + "operator. Probably the setOutputType method was not called."); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); + InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper( + new DataInputStream(bais) + ); + + accumulator = outTypeSerializer.deserialize(in); } @Override public void processWatermark(Watermark mark) throws Exception { output.emitWatermark(mark); } + + @Override + public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) { + outTypeSerializer = outTypeInfo.createSerializer(executionConfig); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper( + new DataOutputStream(baos) + ); + + try { + outTypeSerializer.serialize(accumulator, out); + } catch (IOException ioe) { + throw new RuntimeException("Unable to serialize initial value of type " + + accumulator.getClass().getSimpleName() + " of fold operator.", ioe); + } + + serializedInitialValue = baos.toByteArray(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java index 5272a48..f4e44c6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java @@ -21,8 +21,8 @@ import java.util.HashMap; import java.util.Map; import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> { @@ -30,28 +30,34 @@ public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> { private static final long serialVersionUID = 1L; private KeySelector<IN, ?> keySelector; - private Map<Object, OUT> values; - private OUT initialValue; + private transient Map<Object, OUT> values; - public StreamGroupedFold(FoldFunction<IN, OUT> folder, KeySelector<IN, ?> keySelector, - OUT initialValue, TypeInformation<OUT> outTypeInformation) { - super(folder, initialValue, outTypeInformation); + public StreamGroupedFold( + FoldFunction<IN, OUT> folder, + KeySelector<IN, ?> keySelector, + OUT initialValue) { + super(folder, initialValue); this.keySelector = keySelector; - this.initialValue = initialValue; + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + values = new HashMap<Object, OUT>(); } @Override public void processElement(StreamRecord<IN> element) throws Exception { Object key = keySelector.getKey(element.getValue()); - OUT accumulator = values.get(key); + OUT value = values.get(key); - if (accumulator != null) { - OUT folded = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue()); + if (value != null) { + OUT folded = userFunction.fold(outTypeSerializer.copy(value), element.getValue()); values.put(key, folded); output.collect(element.replace(folded)); } else { - OUT first = userFunction.fold(outTypeSerializer.copy(initialValue), element.getValue()); + OUT first = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue()); values.put(key, first); output.collect(element.replace(first)); } http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java index 6be011e..7533c33 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java @@ -29,17 +29,21 @@ public class StreamGroupedReduce<IN> extends StreamReduce<IN> { private static final long serialVersionUID = 1L; private KeySelector<IN, ?> keySelector; - private Map<Object, IN> values; + private transient Map<Object, IN> values; public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector) { super(reducer); this.keySelector = keySelector; - values = new HashMap<Object, IN>(); } @Override public void processElement(StreamRecord<IN> element) throws Exception { Object key = keySelector.getKey(element.getValue()); + + if (values == null) { + values = new HashMap<Object, IN>(); + } + IN currentValue = values.get(key); if (currentValue != null) { // TODO: find a way to let operators copy elements (maybe) http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java index 52c07d0..af562fe 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java @@ -26,7 +26,7 @@ public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFuncti private static final long serialVersionUID = 1L; - private IN currentValue; + private transient IN currentValue; public StreamReduce(ReduceFunction<IN> reducer) { super(reducer); @@ -42,7 +42,6 @@ public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFuncti currentValue = userFunction.reduce(currentValue, element.getValue()); } else { currentValue = element.getValue(); - } output.collect(element.replace(currentValue)); } http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java new file mode 100644 index 0000000..11100a4 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SplitDataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.ArrayList; +import java.util.List; + +public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase { + + private String resultPath1; + private String resultPath2; + private String expected1; + private String expected2; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath1 = tempFolder.newFile().toURI().toString(); + resultPath2 = tempFolder.newFile().toURI().toString(); + expected1 = ""; + expected2 = ""; + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected1, resultPath1); + compareResultsByLinesInMemory(expected2, resultPath2); + } + + /** + * Tests the proper functioning of the streaming fold operator. For this purpose, a stream + * of Tuple2<Integer, Integer> is created. The stream is grouped according to the first tuple + * value. Each group is folded where the second tuple value is summed up. + * + * @throws Exception + */ + @Test + public void testFoldOperation() throws Exception { + int numElements = 10; + int numKeys = 2; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Tuple2<Integer, Integer>> sourceStream = env.addSource(new TupleSource(numElements, numKeys)); + + SplitDataStream<Tuple2<Integer, Integer>> splittedResult = sourceStream + .groupBy(0) + .fold(0, new FoldFunction<Tuple2<Integer, Integer>, Integer>() { + @Override + public Integer fold(Integer accumulator, Tuple2<Integer, Integer> value) throws Exception { + return accumulator + value.f1; + } + }).map(new RichMapFunction<Integer, Tuple2<Integer, Integer>>() { + @Override + public Tuple2<Integer, Integer> map(Integer value) throws Exception { + return new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), value); + } + }).split(new OutputSelector<Tuple2<Integer, Integer>>() { + @Override + public Iterable<String> select(Tuple2<Integer, Integer> value) { + List<String> output = new ArrayList<>(); + + output.add(value.f0 + ""); + + return output; + } + }); + + splittedResult.select("0").map(new MapFunction<Tuple2<Integer,Integer>, Integer>() { + @Override + public Integer map(Tuple2<Integer, Integer> value) throws Exception { + return value.f1; + } + }).writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE); + + splittedResult.select("1").map(new MapFunction<Tuple2<Integer, Integer>, Integer>() { + @Override + public Integer map(Tuple2<Integer, Integer> value) throws Exception { + return value.f1; + } + }).writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE); + + StringBuilder builder1 = new StringBuilder(); + StringBuilder builder2 = new StringBuilder(); + int counter1 = 0; + int counter2 = 0; + + for (int i = 0; i < numElements; i++) { + if (i % 2 == 0) { + counter1 += i; + builder1.append(counter1 + "\n"); + } else { + counter2 += i; + builder2.append(counter2 + "\n"); + } + } + + expected1 = builder1.toString(); + expected2 = builder2.toString(); + + env.execute(); + } + + /** + * Tests whether the fold operation can also be called with non Java serializable types. + */ + @Test + public void testFoldOperationWithNonJavaSerializableType() throws Exception { + final int numElements = 10; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements)); + + input + .groupBy(0) + .fold( + new NonSerializable(42), + new FoldFunction<Tuple2<Integer, NonSerializable>, NonSerializable>() { + @Override + public NonSerializable fold(NonSerializable accumulator, Tuple2<Integer, NonSerializable> value) throws Exception { + return new NonSerializable(accumulator.value + value.f1.value); + } + }) + .map(new MapFunction<NonSerializable, Integer>() { + @Override + public Integer map(NonSerializable value) throws Exception { + return value.value; + } + }) + .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE); + + StringBuilder builder = new StringBuilder(); + + for (int i = 0; i < numElements; i++) { + builder.append(42 + i + "\n"); + } + + expected1 = builder.toString(); + + env.execute(); + } + + private static class NonSerializable { + // This makes the type non-serializable + private final Object obj = new Object(); + + private final int value; + + public NonSerializable(int value) { + this.value = value; + } + } + + private static class NonSerializableTupleSource implements SourceFunction<Tuple2<Integer, NonSerializable>> { + private final int numElements; + + public NonSerializableTupleSource(int numElements) { + this.numElements = numElements; + } + + + @Override + public void run(SourceContext<Tuple2<Integer, NonSerializable>> ctx) throws Exception { + for (int i = 0; i < numElements; i++) { + ctx.collect(new Tuple2<Integer, NonSerializable>(i, new NonSerializable(i))); + } + } + + @Override + public void cancel() {} + } + + private static class TupleSource implements SourceFunction<Tuple2<Integer, Integer>> { + + private final int numElements; + private final int numKeys; + + public TupleSource(int numElements, int numKeys) { + this.numElements = numElements; + this.numKeys = numKeys; + } + + @Override + public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { + for (int i = 0; i < numElements; i++) { + Tuple2<Integer, Integer> result = new Tuple2<>(i % numKeys, i); + ctx.collect(result); + } + } + + @Override + public void cancel() { + + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index fb2ef56..3b05274 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -17,18 +17,29 @@ */ package org.apache.flink.streaming.api.graph; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.ConnectedDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; import org.apache.flink.streaming.util.EvenOddOutputSelector; import org.apache.flink.streaming.util.NoOpIntMap; import org.apache.flink.streaming.util.NoOpSink; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.apache.flink.streaming.util.TestStreamEnvironment; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -176,4 +187,120 @@ public class StreamGraphGeneratorTest extends StreamingMultipleProgramsTestBase } + /** + * Test whether an {@link OutputTypeConfigurable} implementation gets called with the correct + * output type. In this test case the output type must be BasicTypeInfo.INT_TYPE_INFO. + * + * @throws Exception + */ + @Test + public void testOutputTypeConfigurationWithOneInputTransformation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Integer> source = env.fromElements(1, 10); + + OutputTypeConfigurableOperationWithOneInput outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithOneInput(); + + DataStream<Integer> result = source.transform( + "Single input and output type configurable operation", + BasicTypeInfo.INT_TYPE_INFO, + outputTypeConfigurableOperation); + + result.addSink(new NoOpSink<Integer>()); + + StreamGraph graph = env.getStreamGraph(); + + assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation()); + } + + @Test + public void testOutputTypeConfigurationWithTwoInputTransformation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Integer> source1 = env.fromElements(1, 10); + DataStream<Integer> source2 = env.fromElements(2, 11); + + ConnectedDataStream<Integer, Integer> connectedSource = source1.connect(source2); + + OutputTypeConfigurableOperationWithTwoInputs outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithTwoInputs(); + + DataStream<Integer> result = connectedSource.transform( + "Two input and output type configurable operation", + BasicTypeInfo.INT_TYPE_INFO, + outputTypeConfigurableOperation); + + result.addSink(new NoOpSink<Integer>()); + + StreamGraph graph = env.getStreamGraph(); + + assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation()); + } + + private static class OutputTypeConfigurableOperationWithTwoInputs + extends AbstractStreamOperator<Integer> + implements TwoInputStreamOperator<Integer, Integer, Integer>, OutputTypeConfigurable<Integer> { + + TypeInformation<Integer> tpeInformation; + + public TypeInformation<Integer> getTypeInformation() { + return tpeInformation; + } + + @Override + public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) { + tpeInformation = outTypeInfo; + } + + @Override + public void processElement1(StreamRecord element) throws Exception { + output.collect(element); + } + + @Override + public void processElement2(StreamRecord element) throws Exception { + output.collect(element); + } + + @Override + public void processWatermark1(Watermark mark) throws Exception { + + } + + @Override + public void processWatermark2(Watermark mark) throws Exception { + + } + + @Override + public void setup(Output output, StreamingRuntimeContext runtimeContext) { + + } + } + + private static class OutputTypeConfigurableOperationWithOneInput + extends AbstractStreamOperator<Integer> + implements OneInputStreamOperator<Integer, Integer>, OutputTypeConfigurable<Integer> { + + TypeInformation<Integer> tpeInformation; + + public TypeInformation<Integer> getTypeInformation() { + return tpeInformation; + } + + @Override + public void processElement(StreamRecord<Integer> element) throws Exception { + output.collect(element); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + + } + + @Override + public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) { + tpeInformation = outTypeInfo; + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java index dcfe3de..82dddfe 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators; import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.RichFoldFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -69,7 +70,9 @@ public class StreamGroupedFoldTest { public String getKey(Integer value) throws Exception { return value.toString(); } - }, "100", outType); + }, "100"); + + operator.setOutputType(outType, new ExecutionConfig()); OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator); @@ -104,7 +107,10 @@ public class StreamGroupedFoldTest { public Integer getKey(Integer value) throws Exception { return value; } - }, "init", BasicTypeInfo.STRING_TYPE_INFO); + }, "init"); + + operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig()); + OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator); long initialTime = 0L; http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala new file mode 100644 index 0000000..3342e1e --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.runtime.StreamingMode +import org.apache.flink.streaming.util.TestStreamEnvironment +import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitSuiteLike + +trait ScalaStreamingMultipleProgramsTestBase + extends TestBaseUtils + with JUnitSuiteLike + with BeforeAndAfterAll { + + val parallelism = 4 + var cluster: Option[ForkableFlinkMiniCluster] = None + + override protected def beforeAll(): Unit = { + val cluster = Some( + TestBaseUtils.startCluster( + 1, + parallelism, + StreamingMode.STREAMING, + false, + false, + true + ) + ) + + val clusterEnvironment = new TestStreamEnvironment(cluster.get, parallelism) + } + + override protected def afterAll(): Unit = { + cluster.foreach { + TestBaseUtils.stopCluster(_, TestBaseUtils.DEFAULT_TIMEOUT) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala new file mode 100644 index 0000000..d5e2b7b --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.api.common.functions.{RichMapFunction, FoldFunction} +import org.apache.flink.core.fs.FileSystem +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.test.util.TestBaseUtils +import org.junit.rules.TemporaryFolder +import org.junit.{After, Before, Rule, Test} + +class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { + + var resultPath1: String = _ + var resultPath2: String = _ + var expected1: String = _ + var expected2: String = _ + + val _tempFolder = new TemporaryFolder() + + @Rule + def tempFolder: TemporaryFolder = _tempFolder + + @Before + def before(): Unit = { + val temp = tempFolder + resultPath1 = temp.newFile.toURI.toString + resultPath2 = temp.newFile.toURI.toString + expected1 = "" + expected2 = "" + } + + @After + def after(): Unit = { + TestBaseUtils.compareResultsByLinesInMemory(expected1, resultPath1) + TestBaseUtils.compareResultsByLinesInMemory(expected2, resultPath2) + } + + /** Tests the streaming fold operation. For this purpose a stream of Tuple[Int, Int] is created. + * The stream is grouped by the first field. For each group, the resulting stream is folded by + * summing up the second tuple field. + * + */ + @Test + def testFoldOperator(): Unit = { + val numElements = 10 + val numKeys = 2 + + val env = StreamExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val sourceStream = env.addSource(new SourceFunction[(Int, Int)] { + + override def run(ctx: SourceContext[(Int, Int)]): Unit = { + 0 until numElements foreach { + i => ctx.collect((i % numKeys, i)) + } + } + + override def cancel(): Unit = {} + }) + + val splittedResult = sourceStream + .groupBy(0) + .fold(0, new FoldFunction[(Int, Int), Int] { + override def fold(accumulator: Int, value: (Int, Int)): Int = { + accumulator + value._2 + } + }) + .map(new RichMapFunction[Int, (Int, Int)] { + override def map(value: Int): (Int, Int) = { + (getRuntimeContext.getIndexOfThisSubtask, value) + } + }) + .split{ + x => + Seq(x._1.toString) + } + + splittedResult + .select("0") + .map(_._2) + .getJavaStream + .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE) + splittedResult + .select("1") + .map(_._2) + .getJavaStream + .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE) + + val groupedSequence = 0 until numElements groupBy( _ % numKeys) + + expected1 = groupedSequence(0).scanLeft(0)(_ + _).tail.mkString("\n") + expected2 = groupedSequence(1).scanLeft(0)(_ + _).tail.mkString("\n") + + env.execute() + } +}