Repository: flink
Updated Branches:
  refs/heads/master c94fdcdd5 -> 8754352ff


[FLINK-2631] [streaming] Fixes the StreamFold operator and adds 
OutputTypeConfigurable interface to support type injection at StreamGraph 
creation.

Adds test for non serializable fold type. Adds test to verify proper output 
type forwarding for OutputTypeConfigurable implementations.
Makes OutputTypeConfigurable typed, tests that TwoInputStreamOperator is output 
type configurable

This closes #1101


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c2791b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c2791b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c2791b0

Branch: refs/heads/master
Commit: 9c2791b0a1b8bd3b0fb189220749d1c82f7a0d09
Parents: c94fdcd
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Sep 7 11:34:48 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Sep 10 12:28:47 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/GroupedDataStream.java       |   2 +-
 .../flink/streaming/api/graph/StreamGraph.java  |  35 ++-
 .../api/graph/StreamGraphGenerator.java         |   2 +-
 .../api/operators/OutputTypeConfigurable.java   |  42 ++++
 .../streaming/api/operators/StreamFold.java     |  51 +++-
 .../api/operators/StreamGroupedFold.java        |  28 ++-
 .../api/operators/StreamGroupedReduce.java      |   8 +-
 .../streaming/api/operators/StreamReduce.java   |   3 +-
 .../streaming/api/StreamingOperatorsITCase.java | 230 +++++++++++++++++++
 .../api/graph/StreamGraphGeneratorTest.java     | 129 ++++++++++-
 .../api/operators/StreamGroupedFoldTest.java    |  10 +-
 ...ScalaStreamingMultipleProgramsTestBase.scala |  55 +++++
 .../api/scala/StreamingOperatorsITCase.scala    | 116 ++++++++++
 13 files changed, 679 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 72ef945..a1106bc 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -88,7 +88,7 @@ public class GroupedDataStream<OUT> extends 
KeyedDataStream<OUT> {
                                Utils.getCallLocationName(), true);
 
                return transform("Grouped Fold", outType, new 
StreamGroupedFold<OUT, R>(clean(folder),
-                               keySelector, initialValue, outType));
+                               keySelector, initialValue));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 6474ae9..cda5686 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
@@ -190,8 +191,12 @@ public class StreamGraph extends StreamingPlan {
                sinks.add(vertexID);
        }
 
-       public <IN, OUT> void addOperator(Integer vertexID, StreamOperator<OUT> 
operatorObject,
-                       TypeInformation<IN> inTypeInfo, TypeInformation<OUT> 
outTypeInfo, String operatorName) {
+       public <IN, OUT> void addOperator(
+                       Integer vertexID,
+                       StreamOperator<OUT> operatorObject,
+                       TypeInformation<IN> inTypeInfo,
+                       TypeInformation<OUT> outTypeInfo,
+                       String operatorName) {
 
                if (operatorObject instanceof StreamSource) {
                        addNode(vertexID, SourceStreamTask.class, 
operatorObject, operatorName);
@@ -205,22 +210,40 @@ public class StreamGraph extends StreamingPlan {
 
                setSerializers(vertexID, inSerializer, null, outSerializer);
 
+               if (operatorObject instanceof OutputTypeConfigurable) {
+                       @SuppressWarnings("unchecked")
+                       OutputTypeConfigurable<OUT> outputTypeConfigurable = 
(OutputTypeConfigurable<OUT>) operatorObject;
+                       // sets the output type which must be know at 
StreamGraph creation time
+                       outputTypeConfigurable.setOutputType(outTypeInfo, 
executionConfig);
+               }
+
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Vertex: {}", vertexID);
                }
        }
 
-       public <IN1, IN2, OUT> void addCoOperator(Integer vertexID,
-                       TwoInputStreamOperator<IN1, IN2, OUT> 
taskoperatorObject, TypeInformation<IN1> in1TypeInfo,
-                       TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> 
outTypeInfo, String operatorName) {
+       public <IN1, IN2, OUT> void addCoOperator(
+                       Integer vertexID,
+                       TwoInputStreamOperator<IN1, IN2, OUT> 
taskOperatorObject,
+                       TypeInformation<IN1> in1TypeInfo,
+                       TypeInformation<IN2> in2TypeInfo,
+                       TypeInformation<OUT> outTypeInfo,
+                       String operatorName) {
 
-               addNode(vertexID, TwoInputStreamTask.class, taskoperatorObject, 
operatorName);
+               addNode(vertexID, TwoInputStreamTask.class, taskOperatorObject, 
operatorName);
 
                TypeSerializer<OUT> outSerializer = (outTypeInfo != null) && 
!(outTypeInfo instanceof MissingTypeInfo) ?
                                outTypeInfo.createSerializer(executionConfig) : 
null;
 
                setSerializers(vertexID, 
in1TypeInfo.createSerializer(executionConfig), 
in2TypeInfo.createSerializer(executionConfig), outSerializer);
 
+               if (taskOperatorObject instanceof OutputTypeConfigurable) {
+                       @SuppressWarnings("unchecked")
+                       OutputTypeConfigurable<OUT> outputTypeConfigurable = 
(OutputTypeConfigurable<OUT>) taskOperatorObject;
+                       // sets the output type which must be know at 
StreamGraph creation time
+                       outputTypeConfigurable.setOutputType(outTypeInfo, 
executionConfig);
+               }
+
                if (LOG.isDebugEnabled()) {
                        LOG.debug("CO-TASK: {}", vertexID);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 6df8cb5..774c00b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -47,7 +47,7 @@ import java.util.Map;
  *
  * <p>
  * This traverses the tree of {@code StreamTransformations} starting from the 
sinks. At each
- * we transformation recursively transform the inputs, then create a node in 
the {@code StreamGraph}
+ * transformation we recursively transform the inputs, then create a node in 
the {@code StreamGraph}
  * and add edges from the input Nodes to our newly created node. The 
transformation methods
  * return the IDs of the nodes in the StreamGraph that represent the input 
transformation. Several
  * IDs can be returned to be able to deal with feedback transformations and 
unions.

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
new file mode 100644
index 0000000..1d05966
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * Stream operators can implement this interface if they need access to the 
output type information
+ * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. 
This can be useful for
+ * cases where the output type is specified by the returns method and, thus, 
after the stream
+ * operator has been created.
+ */
+public interface OutputTypeConfigurable<OUT> {
+
+       /**
+        * Is called by the {@link 
org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, 
StreamOperator, TypeInformation, TypeInformation, String)}
+        * method when the {@link 
org.apache.flink.streaming.api.graph.StreamGraph} is generated. The
+        * method is called with the output {@link TypeInformation} which is 
also used for the
+        * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} output 
serializer.
+        *
+        * @param outTypeInfo Output type information of the {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask}
+        * @param executionConfig Execution configuration
+        */
+       void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig 
executionConfig);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
index a5e5264..81115f0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
@@ -17,27 +17,36 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
 public class StreamFold<IN, OUT>
                extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
-               implements OneInputStreamOperator<IN, OUT> {
+               implements OneInputStreamOperator<IN, OUT>, 
OutputTypeConfigurable<OUT> {
 
        private static final long serialVersionUID = 1L;
 
-       private OUT accumulator;
+       protected transient OUT accumulator;
+       private byte[] serializedInitialValue;
+
        protected TypeSerializer<OUT> outTypeSerializer;
-       protected TypeInformation<OUT> outTypeInformation;
 
-       public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue, 
TypeInformation<OUT> outTypeInformation) {
+       public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue) {
                super(folder);
                this.accumulator = initialValue;
-               this.outTypeInformation = outTypeInformation;
                this.chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
        }
 
@@ -50,11 +59,41 @@ public class StreamFold<IN, OUT>
        @Override
        public void open(Configuration config) throws Exception {
                super.open(config);
-               this.outTypeSerializer = 
outTypeInformation.createSerializer(executionConfig);
+
+               if (serializedInitialValue == null) {
+                       throw new RuntimeException("No initial value was 
serialized for the fold " +
+                                       "operator. Probably the setOutputType 
method was not called.");
+               }
+
+               ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedInitialValue);
+               InputViewDataInputStreamWrapper in = new 
InputViewDataInputStreamWrapper(
+                       new DataInputStream(bais)
+               );
+
+               accumulator = outTypeSerializer.deserialize(in);
        }
 
        @Override
        public void processWatermark(Watermark mark) throws Exception {
                output.emitWatermark(mark);
        }
+
+       @Override
+       public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
+               outTypeSerializer = 
outTypeInfo.createSerializer(executionConfig);
+
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               OutputViewDataOutputStreamWrapper out = new 
OutputViewDataOutputStreamWrapper(
+                       new DataOutputStream(baos)
+               );
+
+               try {
+                       outTypeSerializer.serialize(accumulator, out);
+               } catch (IOException ioe) {
+                       throw new RuntimeException("Unable to serialize initial 
value of type " +
+                                       accumulator.getClass().getSimpleName() 
+ " of fold operator.", ioe);
+               }
+
+               serializedInitialValue = baos.toByteArray();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 5272a48..f4e44c6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -21,8 +21,8 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
@@ -30,28 +30,34 @@ public class StreamGroupedFold<IN, OUT> extends 
StreamFold<IN, OUT> {
        private static final long serialVersionUID = 1L;
 
        private KeySelector<IN, ?> keySelector;
-       private Map<Object, OUT> values;
-       private OUT initialValue;
+       private transient Map<Object, OUT> values;
 
-       public StreamGroupedFold(FoldFunction<IN, OUT> folder, KeySelector<IN, 
?> keySelector,
-                       OUT initialValue, TypeInformation<OUT> 
outTypeInformation) {
-               super(folder, initialValue, outTypeInformation);
+       public StreamGroupedFold(
+                       FoldFunction<IN, OUT> folder,
+                       KeySelector<IN, ?> keySelector,
+                       OUT initialValue) {
+               super(folder, initialValue);
                this.keySelector = keySelector;
-               this.initialValue = initialValue;
+       }
+
+       @Override
+       public void open(Configuration configuration) throws Exception {
+               super.open(configuration);
+
                values = new HashMap<Object, OUT>();
        }
 
        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
                Object key = keySelector.getKey(element.getValue());
-               OUT accumulator = values.get(key);
+               OUT value = values.get(key);
 
-               if (accumulator != null) {
-                       OUT folded = 
userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
+               if (value != null) {
+                       OUT folded = 
userFunction.fold(outTypeSerializer.copy(value), element.getValue());
                        values.put(key, folded);
                        output.collect(element.replace(folded));
                } else {
-                       OUT first = 
userFunction.fold(outTypeSerializer.copy(initialValue), element.getValue());
+                       OUT first = 
userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
                        values.put(key, first);
                        output.collect(element.replace(first));
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 6be011e..7533c33 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -29,17 +29,21 @@ public class StreamGroupedReduce<IN> extends 
StreamReduce<IN> {
        private static final long serialVersionUID = 1L;
 
        private KeySelector<IN, ?> keySelector;
-       private Map<Object, IN> values;
+       private transient Map<Object, IN> values;
 
        public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, 
?> keySelector) {
                super(reducer);
                this.keySelector = keySelector;
-               values = new HashMap<Object, IN>();
        }
 
        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
                Object key = keySelector.getKey(element.getValue());
+
+               if (values == null) {
+                       values = new HashMap<Object, IN>();
+               }
+
                IN currentValue = values.get(key);
                if (currentValue != null) {
                        // TODO: find a way to let operators copy elements 
(maybe)

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
index 52c07d0..af562fe 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
@@ -26,7 +26,7 @@ public class StreamReduce<IN> extends 
AbstractUdfStreamOperator<IN, ReduceFuncti
 
        private static final long serialVersionUID = 1L;
 
-       private IN currentValue;
+       private transient IN currentValue;
 
        public StreamReduce(ReduceFunction<IN> reducer) {
                super(reducer);
@@ -42,7 +42,6 @@ public class StreamReduce<IN> extends 
AbstractUdfStreamOperator<IN, ReduceFuncti
                        currentValue = userFunction.reduce(currentValue, 
element.getValue());
                } else {
                        currentValue = element.getValue();
-
                }
                output.collect(element.replace(currentValue));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
new file mode 100644
index 0000000..11100a4
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamingOperatorsITCase extends 
StreamingMultipleProgramsTestBase {
+
+       private String resultPath1;
+       private String resultPath2;
+       private String expected1;
+       private String expected2;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception {
+               resultPath1 = tempFolder.newFile().toURI().toString();
+               resultPath2 = tempFolder.newFile().toURI().toString();
+               expected1 = "";
+               expected2 = "";
+       }
+
+       @After
+       public void after() throws Exception {
+               compareResultsByLinesInMemory(expected1, resultPath1);
+               compareResultsByLinesInMemory(expected2, resultPath2);
+       }
+
+       /**
+        * Tests the proper functioning of the streaming fold operator. For 
this purpose, a stream
+        * of Tuple2<Integer, Integer> is created. The stream is grouped 
according to the first tuple
+        * value. Each group is folded where the second tuple value is summed 
up.
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testFoldOperation() throws Exception {
+               int numElements = 10;
+               int numKeys = 2;
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               DataStream<Tuple2<Integer, Integer>> sourceStream = 
env.addSource(new TupleSource(numElements, numKeys));
+
+               SplitDataStream<Tuple2<Integer, Integer>> splittedResult = 
sourceStream
+                       .groupBy(0)
+                       .fold(0, new FoldFunction<Tuple2<Integer, Integer>, 
Integer>() {
+                               @Override
+                               public Integer fold(Integer accumulator, 
Tuple2<Integer, Integer> value) throws Exception {
+                                       return accumulator + value.f1;
+                               }
+                       }).map(new RichMapFunction<Integer, Tuple2<Integer, 
Integer>>() {
+                               @Override
+                               public Tuple2<Integer, Integer> map(Integer 
value) throws Exception {
+                                       return new Tuple2<Integer, 
Integer>(getRuntimeContext().getIndexOfThisSubtask(), value);
+                               }
+                       }).split(new OutputSelector<Tuple2<Integer, Integer>>() 
{
+                               @Override
+                               public Iterable<String> select(Tuple2<Integer, 
Integer> value) {
+                                       List<String> output = new ArrayList<>();
+
+                                       output.add(value.f0 + "");
+
+                                       return output;
+                               }
+                       });
+
+               splittedResult.select("0").map(new 
MapFunction<Tuple2<Integer,Integer>, Integer>() {
+                       @Override
+                       public Integer map(Tuple2<Integer, Integer> value) 
throws Exception {
+                               return value.f1;
+                       }
+               }).writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+               splittedResult.select("1").map(new MapFunction<Tuple2<Integer, 
Integer>, Integer>() {
+                       @Override
+                       public Integer map(Tuple2<Integer, Integer> value) 
throws Exception {
+                               return value.f1;
+                       }
+               }).writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
+
+               StringBuilder builder1 = new StringBuilder();
+               StringBuilder builder2 = new StringBuilder();
+               int counter1 = 0;
+               int counter2 = 0;
+
+               for (int i = 0; i < numElements; i++) {
+                       if (i % 2 == 0) {
+                               counter1 += i;
+                               builder1.append(counter1 + "\n");
+                       } else {
+                               counter2 += i;
+                               builder2.append(counter2 + "\n");
+                       }
+               }
+
+               expected1 = builder1.toString();
+               expected2 = builder2.toString();
+
+               env.execute();
+       }
+
+       /**
+        * Tests whether the fold operation can also be called with non Java 
serializable types.
+        */
+       @Test
+       public void testFoldOperationWithNonJavaSerializableType() throws 
Exception {
+               final int numElements = 10;
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               DataStream<Tuple2<Integer, NonSerializable>> input = 
env.addSource(new NonSerializableTupleSource(numElements));
+
+               input
+                       .groupBy(0)
+                       .fold(
+                               new NonSerializable(42),
+                               new FoldFunction<Tuple2<Integer, 
NonSerializable>, NonSerializable>() {
+                                       @Override
+                                       public NonSerializable 
fold(NonSerializable accumulator, Tuple2<Integer, NonSerializable> value) 
throws Exception {
+                                               return new 
NonSerializable(accumulator.value + value.f1.value);
+                                       }
+                       })
+                       .map(new MapFunction<NonSerializable, Integer>() {
+                               @Override
+                               public Integer map(NonSerializable value) 
throws Exception {
+                                       return value.value;
+                               }
+                       })
+                       .writeAsText(resultPath1, 
FileSystem.WriteMode.OVERWRITE);
+
+               StringBuilder builder = new StringBuilder();
+
+               for (int i = 0; i < numElements; i++) {
+                       builder.append(42 + i + "\n");
+               }
+
+               expected1 = builder.toString();
+
+               env.execute();
+       }
+
+       private static class NonSerializable {
+               // This makes the type non-serializable
+               private final Object obj = new Object();
+
+               private final int value;
+
+               public NonSerializable(int value) {
+                       this.value = value;
+               }
+       }
+
+       private static class NonSerializableTupleSource implements 
SourceFunction<Tuple2<Integer, NonSerializable>> {
+               private final int numElements;
+
+               public NonSerializableTupleSource(int numElements) {
+                       this.numElements = numElements;
+               }
+
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, NonSerializable>> 
ctx) throws Exception {
+                       for (int i = 0; i < numElements; i++) {
+                               ctx.collect(new Tuple2<Integer, 
NonSerializable>(i, new NonSerializable(i)));
+                       }
+               }
+
+               @Override
+               public void cancel() {}
+       }
+
+       private static class TupleSource implements 
SourceFunction<Tuple2<Integer, Integer>> {
+
+               private final int numElements;
+               private final int numKeys;
+
+               public TupleSource(int numElements, int numKeys) {
+                       this.numElements = numElements;
+                       this.numKeys = numKeys;
+               }
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, Integer>> ctx) 
throws Exception {
+                       for (int i = 0; i < numElements; i++) {
+                               Tuple2<Integer, Integer> result = new 
Tuple2<>(i % numKeys, i);
+                               ctx.collect(result);
+                       }
+               }
+
+               @Override
+               public void cancel() {
+
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index fb2ef56..3b05274 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -17,18 +17,29 @@
  */
 package org.apache.flink.streaming.api.graph;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.ConnectedDataStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.apache.flink.streaming.util.EvenOddOutputSelector;
 import org.apache.flink.streaming.util.NoOpIntMap;
 import org.apache.flink.streaming.util.NoOpSink;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -176,4 +187,120 @@ public class StreamGraphGeneratorTest extends 
StreamingMultipleProgramsTestBase
 
        }
 
+       /**
+        * Test whether an {@link OutputTypeConfigurable} implementation gets 
called with the correct
+        * output type. In this test case the output type must be 
BasicTypeInfo.INT_TYPE_INFO.
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testOutputTypeConfigurationWithOneInputTransformation() 
throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Integer> source = env.fromElements(1, 10);
+
+               OutputTypeConfigurableOperationWithOneInput 
outputTypeConfigurableOperation = new 
OutputTypeConfigurableOperationWithOneInput();
+
+               DataStream<Integer> result = source.transform(
+                       "Single input and output type configurable operation",
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       outputTypeConfigurableOperation);
+
+               result.addSink(new NoOpSink<Integer>());
+
+               StreamGraph graph = env.getStreamGraph();
+
+               assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
outputTypeConfigurableOperation.getTypeInformation());
+       }
+
+       @Test
+       public void testOutputTypeConfigurationWithTwoInputTransformation() 
throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Integer> source1 = env.fromElements(1, 10);
+               DataStream<Integer> source2 = env.fromElements(2, 11);
+
+               ConnectedDataStream<Integer, Integer> connectedSource = 
source1.connect(source2);
+
+               OutputTypeConfigurableOperationWithTwoInputs 
outputTypeConfigurableOperation = new 
OutputTypeConfigurableOperationWithTwoInputs();
+
+               DataStream<Integer> result = connectedSource.transform(
+                               "Two input and output type configurable 
operation",
+                               BasicTypeInfo.INT_TYPE_INFO,
+                               outputTypeConfigurableOperation);
+
+               result.addSink(new NoOpSink<Integer>());
+
+               StreamGraph graph = env.getStreamGraph();
+
+               assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
outputTypeConfigurableOperation.getTypeInformation());
+       }
+
+       private static class OutputTypeConfigurableOperationWithTwoInputs
+                       extends AbstractStreamOperator<Integer>
+                       implements TwoInputStreamOperator<Integer, Integer, 
Integer>, OutputTypeConfigurable<Integer> {
+
+               TypeInformation<Integer> tpeInformation;
+
+               public TypeInformation<Integer> getTypeInformation() {
+                       return tpeInformation;
+               }
+
+               @Override
+               public void setOutputType(TypeInformation<Integer> outTypeInfo, 
ExecutionConfig executionConfig) {
+                       tpeInformation = outTypeInfo;
+               }
+
+               @Override
+               public void processElement1(StreamRecord element) throws 
Exception {
+                       output.collect(element);
+               }
+
+               @Override
+               public void processElement2(StreamRecord element) throws 
Exception {
+                       output.collect(element);
+               }
+
+               @Override
+               public void processWatermark1(Watermark mark) throws Exception {
+
+               }
+
+               @Override
+               public void processWatermark2(Watermark mark) throws Exception {
+
+               }
+
+               @Override
+               public void setup(Output output, StreamingRuntimeContext 
runtimeContext) {
+
+               }
+       }
+
+       private static class OutputTypeConfigurableOperationWithOneInput
+                       extends AbstractStreamOperator<Integer>
+                       implements OneInputStreamOperator<Integer, Integer>, 
OutputTypeConfigurable<Integer> {
+
+               TypeInformation<Integer> tpeInformation;
+
+               public TypeInformation<Integer> getTypeInformation() {
+                       return tpeInformation;
+               }
+
+               @Override
+               public void processElement(StreamRecord<Integer> element) 
throws Exception {
+                       output.collect(element);
+               }
+
+               @Override
+               public void processWatermark(Watermark mark) throws Exception {
+
+               }
+
+               @Override
+               public void setOutputType(TypeInformation<Integer> outTypeInfo, 
ExecutionConfig executionConfig) {
+                       tpeInformation = outTypeInfo;
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
index dcfe3de..82dddfe 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.RichFoldFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -69,7 +70,9 @@ public class StreamGroupedFoldTest {
                        public String getKey(Integer value) throws Exception {
                                return value.toString();
                        }
-               }, "100", outType);
+               }, "100");
+
+               operator.setOutputType(outType, new ExecutionConfig());
 
                OneInputStreamOperatorTestHarness<Integer, String> testHarness 
= new OneInputStreamOperatorTestHarness<Integer, String>(operator);
 
@@ -104,7 +107,10 @@ public class StreamGroupedFoldTest {
                        public Integer getKey(Integer value) throws Exception {
                                return value;
                        }
-               }, "init", BasicTypeInfo.STRING_TYPE_INFO);
+               }, "init");
+
+               operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new 
ExecutionConfig());
+
                OneInputStreamOperatorTestHarness<Integer, String> testHarness 
= new OneInputStreamOperatorTestHarness<Integer, String>(operator);
 
                long initialTime = 0L;

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
new file mode 100644
index 0000000..3342e1e
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.streaming.util.TestStreamEnvironment
+import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.junit.JUnitSuiteLike
+
+trait ScalaStreamingMultipleProgramsTestBase
+  extends TestBaseUtils
+  with  JUnitSuiteLike
+  with BeforeAndAfterAll {
+
+  val parallelism = 4
+  var cluster: Option[ForkableFlinkMiniCluster] = None
+
+  override protected def beforeAll(): Unit = {
+    val cluster = Some(
+      TestBaseUtils.startCluster(
+        1,
+        parallelism,
+        StreamingMode.STREAMING,
+        false,
+        false,
+        true
+      )
+    )
+
+    val clusterEnvironment = new TestStreamEnvironment(cluster.get, 
parallelism)
+  }
+
+  override protected def afterAll(): Unit = {
+    cluster.foreach {
+      TestBaseUtils.stopCluster(_, TestBaseUtils.DEFAULT_TIMEOUT)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
new file mode 100644
index 0000000..d5e2b7b
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.api.common.functions.{RichMapFunction, FoldFunction}
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.rules.TemporaryFolder
+import org.junit.{After, Before, Rule, Test}
+
+class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
+
+  var resultPath1: String = _
+  var resultPath2: String = _
+  var expected1: String = _
+  var expected2: String = _
+
+  val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder: TemporaryFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    val temp = tempFolder
+    resultPath1 = temp.newFile.toURI.toString
+    resultPath2 = temp.newFile.toURI.toString
+    expected1 = ""
+    expected2 = ""
+  }
+
+  @After
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected1, resultPath1)
+    TestBaseUtils.compareResultsByLinesInMemory(expected2, resultPath2)
+  }
+
+  /** Tests the streaming fold operation. For this purpose a stream of 
Tuple[Int, Int] is created.
+    * The stream is grouped by the first field. For each group, the resulting 
stream is folded by
+    * summing up the second tuple field.
+    *
+    */
+  @Test
+  def testFoldOperator(): Unit = {
+    val numElements = 10
+    val numKeys = 2
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val sourceStream = env.addSource(new SourceFunction[(Int, Int)] {
+
+      override def run(ctx: SourceContext[(Int, Int)]): Unit = {
+        0 until numElements foreach {
+          i => ctx.collect((i % numKeys, i))
+        }
+      }
+
+      override def cancel(): Unit = {}
+    })
+
+    val splittedResult = sourceStream
+      .groupBy(0)
+      .fold(0, new FoldFunction[(Int, Int), Int] {
+        override def fold(accumulator: Int, value: (Int, Int)): Int = {
+          accumulator + value._2
+        }
+      })
+      .map(new RichMapFunction[Int, (Int, Int)] {
+        override def map(value: Int): (Int, Int) = {
+          (getRuntimeContext.getIndexOfThisSubtask, value)
+        }
+      })
+      .split{
+        x =>
+          Seq(x._1.toString)
+      }
+
+    splittedResult
+      .select("0")
+      .map(_._2)
+      .getJavaStream
+      .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE)
+    splittedResult
+      .select("1")
+      .map(_._2)
+      .getJavaStream
+      .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE)
+
+    val groupedSequence = 0 until numElements groupBy( _ % numKeys)
+
+    expected1 = groupedSequence(0).scanLeft(0)(_ + _).tail.mkString("\n")
+    expected2 = groupedSequence(1).scanLeft(0)(_ + _).tail.mkString("\n")
+
+    env.execute()
+  }
+}

Reply via email to