Repository: flink
Updated Branches:
  refs/heads/master 2eb5cfeb2 -> 31d3eaa74


[FLINK-2182] Add stateful Streaming Sequence Source

Closes #804


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31d3eaa7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31d3eaa7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31d3eaa7

Branch: refs/heads/master
Commit: 31d3eaa7468a98334dd1c3fa049c9fad1c3b6ccf
Parents: 2eb5cfe
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Mon Jun 8 12:50:38 2015 +0200
Committer: mbalassi <mbala...@apache.org>
Committed: Tue Jun 9 18:49:41 2015 +0200

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java | 25 ++----
 .../source/StatefulSequenceSource.java          | 88 ++++++++++++++++++++
 .../flink/streaming/api/DataStreamTest.java     |  2 +-
 .../flink/streaming/api/SourceFunctionTest.java | 19 ++++-
 .../api/StreamExecutionEnvironmentTest.java     | 43 ++--------
 .../api/collector/DirectedOutputTest.java       |  2 +-
 .../api/complex/ComplexIntegrationTest.java     | 27 ++++--
 .../streaming/api/operators/ProjectTest.java    |  2 +-
 .../api/streamtask/StreamVertexTest.java        |  2 +-
 .../apache/flink/streaming/util/MockSource.java | 55 ------------
 .../streaming/util/SourceFunctionUtil.java      | 65 +++++++++++++++
 .../api/scala/StreamExecutionEnvironment.scala  | 18 +---
 .../streaming/api/scala/DataStreamTest.scala    |  2 +-
 .../ProcessFailureStreamingRecoveryITCase.java  |  3 +-
 14 files changed, 215 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/31d3eaa7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 60b849c..f009871 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -57,11 +57,11 @@ import 
org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFun
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.types.StringValue;
-import org.apache.flink.util.NumberSequenceIterator;
 import org.apache.flink.util.SplittableIterator;
 
 import java.io.File;
@@ -399,8 +399,10 @@ public abstract class StreamExecutionEnvironment {
        // 
--------------------------------------------------------------------------------------------
 
        /**
-        * Creates a new data stream that contains a sequence of numbers. The 
data stream will be created with parallelism
-        * one, so the order of the elements is guaranteed.
+        * Creates a new data stream that contains a sequence of numbers. This 
is a parallel source,
+        * if you manually set the parallelism to {@code 1}
+        * (using {@link 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.setParallelism()})
+        * the generated sequence of elements is in order.
         *
         * @param from
         *              The number to start at (inclusive)
@@ -412,22 +414,7 @@ public abstract class StreamExecutionEnvironment {
                if (from > to) {
                        throw new IllegalArgumentException("Start of sequence 
must not be greater than the end");
                }
-               return fromCollection(new NumberSequenceIterator(from, to), 
BasicTypeInfo.LONG_TYPE_INFO, "Sequence Source");
-       }
-
-       /**
-        * Creates a new data stream that contains a sequence of numbers. The 
data stream will be created in parallel, so
-        * there is no guarantee about the oder of the elements.
-        *
-        * @param from
-        *              The number to start at (inclusive)
-        * @param to
-        *              The number to stop at (inclusive)
-        * @return A data stream, containing all number in the [from, to] 
interval
-        */
-       public DataStreamSource<Long> generateParallelSequence(long from, long 
to) {
-               return fromParallelCollection(new NumberSequenceIterator(from, 
to), BasicTypeInfo.LONG_TYPE_INFO, "Parallel " +
-                               "Sequence Source");
+               return addSource(new StatefulSequenceSource(from, to), 
"Sequence Source");
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/31d3eaa7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
new file mode 100644
index 0000000..e213363
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+
+/**
+ * A stateful streaming source that emits each number from a given interval 
exactly once,
+ * possibly in parallel.
+ */
+public class StatefulSequenceSource extends RichParallelSourceFunction<Long> 
implements Checkpointed<Long> {
+       private static final long serialVersionUID = 1L;
+
+       private final long start;
+       private final long end;
+
+       private long collected;
+
+       private volatile boolean isRunning = true;
+
+       /**
+        * Creates a source that emits all numbers from the given interval 
exactly once.
+        *
+        * @param start Start of the range of numbers to emit.
+        * @param end End of the range of numbers to emit.
+        */
+       public StatefulSequenceSource(long start, long end) {
+               this.start = start;
+               this.end = end;
+               this.collected = 0;
+       }
+
+       @Override
+       public void run(SourceContext<Long> ctx) throws Exception {
+               final Object checkpointLock = ctx.getCheckpointLock();
+
+               RuntimeContext context = getRuntimeContext();
+
+               final long stepSize = context.getNumberOfParallelSubtasks();
+               final long congruence = start + context.getIndexOfThisSubtask();
+
+               final long toCollect =
+                               ((end - start + 1) % stepSize > (congruence - 
start)) ?
+                                       ((end - start + 1) / stepSize + 1) :
+                                       ((end - start + 1) / stepSize);
+
+               while (isRunning && collected < toCollect) {
+
+                       synchronized (checkpointLock) {
+                               ctx.collect(collected * stepSize + congruence);
+                               collected++;
+                       }
+               }
+       }
+
+       @Override
+       public void cancel() {
+               isRunning = false;
+       }
+
+       @Override
+       public Long snapshotState(long checkpointId, long checkpointTimestamp) 
throws Exception {
+               return collected;
+       }
+
+       @Override
+       public void restoreState(Long state) {
+               collected = state;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/31d3eaa7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 58f7eba..ac04ce8 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -295,7 +295,7 @@ public class DataStreamTest {
                } catch (IllegalArgumentException success) {
                }
 
-               DataStreamSource<Long> parallelSource = 
env.generateParallelSequence(0, 0);
+               DataStreamSource<Long> parallelSource = env.generateSequence(0, 
0);
                assertEquals(7, 
graph.getStreamNode(parallelSource.getId()).getParallelism());
 
                parallelSource.setParallelism(3);

http://git-wip-us.apache.org/repos/asf/flink/blob/31d3eaa7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
index 1bfb13a..7a78205 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
@@ -24,7 +24,8 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.util.MockSource;
+import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
+import org.apache.flink.streaming.util.SourceFunctionUtil;
 import org.junit.Test;
 
 public class SourceFunctionTest {
@@ -32,20 +33,30 @@ public class SourceFunctionTest {
        @Test
        public void fromElementsTest() throws Exception {
                List<Integer> expectedList = Arrays.asList(1, 2, 3);
-               List<Integer> actualList = MockSource.createAndExecute(new 
FromElementsFunction<Integer>(1,
-                               2, 3));
+               List<Integer> actualList = 
SourceFunctionUtil.runSourceFunction(new FromElementsFunction<Integer>(
+                               1,
+                               2,
+                               3));
                assertEquals(expectedList, actualList);
        }
 
        @Test
        public void fromCollectionTest() throws Exception {
                List<Integer> expectedList = Arrays.asList(1, 2, 3);
-               List<Integer> actualList = MockSource.createAndExecute(new 
FromElementsFunction<Integer>(
+               List<Integer> actualList = 
SourceFunctionUtil.runSourceFunction(new FromElementsFunction<Integer>(
                                Arrays.asList(1, 2, 3)));
                assertEquals(expectedList, actualList);
        }
 
        @Test
+       public void generateSequenceTest() throws Exception {
+               List<Long> expectedList = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 
7L);
+               List<Long> actualList = 
SourceFunctionUtil.runSourceFunction(new StatefulSequenceSource(1,
+                               7));
+               assertEquals(expectedList, actualList);
+       }
+
+       @Test
        public void socketTextStreamTest() throws Exception {
                // TODO: does not work because we cannot set the internal 
socket anymore
 //             List<String> expectedList = Arrays.asList("a", "b", "c");

http://git-wip-us.apache.org/repos/asf/flink/blob/31d3eaa7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index c4a3b69..7373276 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -32,9 +31,9 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
 import 
org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -74,32 +73,11 @@ public class StreamExecutionEnvironmentTest {
        }
 
        @Test
-       public void testGenerateSequenceParallelism() throws Exception {
-               StreamExecutionEnvironment env = new 
TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
-               boolean seenExpectedException = false;
-
-               try {
-                       DataStream<Long> dataStream1 = env.generateSequence(0, 
0).setParallelism(4);
-               } catch (IllegalArgumentException e) {
-                       seenExpectedException = true;
-               }
-
-               DataStream<Long> dataStream2 = env.generateParallelSequence(0, 
0).setParallelism(4);
-
-               String plan = env.getExecutionPlan();
-
-               assertTrue("Expected Exception for setting parallelism was not 
thrown.", seenExpectedException);
-               assertTrue("Parallelism for dataStream1 is not right.",
-                               plan.contains("\"contents\":\"Sequence 
Source\",\"parallelism\":1"));
-               assertTrue("Parallelism for dataStream2 is not right.",
-                               plan.contains("\"contents\":\"Parallel Sequence 
Source\",\"parallelism\":4"));
-       }
-
-       @Test
        public void testSources() {
                StreamExecutionEnvironment env = new 
TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
 
                SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
 
                        @Override
                        public void run(SourceContext<Integer> ctx) throws 
Exception {
@@ -110,21 +88,18 @@ public class StreamExecutionEnvironmentTest {
                        }
                };
                DataStreamSource<Integer> src1 = env.addSource(srcFun);
-               assertEquals(srcFun, getFunctionForDataSource(src1));
+               assertEquals(srcFun, getFunctionFromDataSource(src1));
 
                List<Long> list = Arrays.asList(0L, 1L, 2L);
 
                DataStreamSource<Long> src2 = env.generateSequence(0, 2);
-               assertTrue(getFunctionForDataSource(src2) instanceof 
FromIteratorFunction);
+               assertTrue(getFunctionFromDataSource(src2) instanceof 
StatefulSequenceSource);
 
                DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
-               assertTrue(getFunctionForDataSource(src3) instanceof 
FromElementsFunction);
+               assertTrue(getFunctionFromDataSource(src3) instanceof 
FromElementsFunction);
 
                DataStreamSource<Long> src4 = env.fromCollection(list);
-               assertTrue(getFunctionForDataSource(src4) instanceof 
FromElementsFunction);
-
-               DataStreamSource<Long> src5 = env.generateParallelSequence(0, 
2);
-               assertTrue(getFunctionForDataSource(src5) instanceof 
FromSplittableIteratorFunction);
+               assertTrue(getFunctionFromDataSource(src4) instanceof 
FromElementsFunction);
        }
 
        /////////////////////////////////////////////////////////////
@@ -132,15 +107,15 @@ public class StreamExecutionEnvironmentTest {
        /////////////////////////////////////////////////////////////
 
 
-       private static StreamOperator<?> getOperatorForDataStream(DataStream<?> 
dataStream) {
+       private static StreamOperator<?> 
getOperatorFromDataStream(DataStream<?> dataStream) {
                StreamExecutionEnvironment env = 
dataStream.getExecutionEnvironment();
                StreamGraph streamGraph = env.getStreamGraph();
                return 
streamGraph.getStreamNode(dataStream.getId()).getOperator();
        }
 
-       private static <T> SourceFunction<T> 
getFunctionForDataSource(DataStreamSource<T> dataStreamSource) {
+       private static <T> SourceFunction<T> 
getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
                AbstractUdfStreamOperator<?, ?> operator =
-                               (AbstractUdfStreamOperator<?, ?>) 
getOperatorForDataStream(dataStreamSource);
+                               (AbstractUdfStreamOperator<?, ?>) 
getOperatorFromDataStream(dataStreamSource);
                return (SourceFunction<T>) operator.getUserFunction();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31d3eaa7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 56b6ae8..fc3e36f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -101,7 +101,7 @@ public class DirectedOutputTest {
                TestListResultSink<Long> evenAndOddSink = new 
TestListResultSink<Long>();
                TestListResultSink<Long> allSink = new 
TestListResultSink<Long>();
 
-               SplitDataStream<Long> source = env.generateParallelSequence(1, 
11).split(new MyOutputSelector());
+               SplitDataStream<Long> source = env.generateSequence(1, 
11).split(new MyOutputSelector());
                source.select(EVEN).addSink(evenSink);
                source.select(ODD, TEN).addSink(oddAndTenSink);
                source.select(EVEN, ODD).addSink(evenAndOddSink);

http://git-wip-us.apache.org/repos/asf/flink/blob/31d3eaa7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 421d4ac..5433778 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -207,6 +207,11 @@ public class ComplexIntegrationTest extends 
StreamingMultipleProgramsTestBase {
                                "16937\n" + "11927\n" + "9973\n" + "14431\n" + 
"19507\n" + "12497\n" + "17497\n" + "14983\n" +
                                "19997\n";
 
+               expected1 = "541\n" + "1223\n" + "1987\n" + "2741\n" + "3571\n" 
+ "10939\n" + "4409\n" +
+                               "5279\n" + "11927\n" + "6133\n" + "6997\n" + 
"12823\n" + "7919\n" + "8831\n" +
+                               "13763\n" + "9733\n" + "9973\n" + "14759\n" + 
"15671\n" + "16673\n" + "17659\n" +
+                               "18617\n" + "19697\n" + "19997\n";
+
                for (int i = 2; i < 100; i++) {
                        expected2 += "(" + i + "," + 20000 / i + ")\n";
                }
@@ -217,11 +222,15 @@ public class ComplexIntegrationTest extends 
StreamingMultipleProgramsTestBase {
                expected2 += "(" + 20000 + "," + 1 + ")";
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               
+
+               // set to parallelism 1 because otherwise we don't know which 
elements go to which parallel
+               // count-window.
+               env.setParallelism(1);
+
                env.setBufferTimeout(0);
 
-               DataStream<Long> sourceStream31 = 
env.generateParallelSequence(1, 10000);
-               DataStream<Long> sourceStream32 = 
env.generateParallelSequence(10001, 20000);
+               DataStream<Long> sourceStream31 = env.generateSequence(1, 
10000);
+               DataStream<Long> sourceStream32 = env.generateSequence(10001, 
20000);
 
                sourceStream31.filter(new PrimeFilterFunction())
                                .window(Count.of(100))
@@ -299,14 +308,18 @@ public class ComplexIntegrationTest extends 
StreamingMultipleProgramsTestBase {
                //Turning on and off chaining
 
                expected1 = "1\n" + "2\n" + "2\n" + "3\n" + "3\n" + "3\n" + 
"4\n" + "4\n" + "4\n" + "4\n" + "5\n" + "5\n" +
-                               "5\n" + "5\n" + "5\n" + "1\n" + "3\n" + "3\n" + 
"4\n" + "5\n" + "5\n" + "6\n" + "8\n" + "9\n" + "10\n" +
-                               "12\n" + "15\n" + "16\n" + "20\n" + "25\n";
+                               "5\n" + "5\n" + "5\n" + "1\n" + "3\n" + "5\n" + 
"8\n" + "11\n" + "14\n" + "18\n" + "22\n" + "26\n" +
+                               "30\n" + "35\n" + "40\n" + "45\n" + "50\n" + 
"55\n";
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               // Set to parallelism 1 to make it deterministic, otherwise, it 
is not clear which
+               // elements will go to which parallel instance of the fold
+               env.setParallelism(1);
                
                env.setBufferTimeout(0);
 
-               DataStream<Long> dataStream51 = env.generateParallelSequence(1, 
5)
+               DataStream<Long> dataStream51 = env.generateSequence(1, 5)
                                .map(new MapFunction<Long, Long>() {
 
                                        @Override
@@ -346,6 +359,8 @@ public class ComplexIntegrationTest extends 
StreamingMultipleProgramsTestBase {
                });
 
 
+               dataStream53.union(dataStream52).print();
+
                dataStream53.union(dataStream52)
                                .writeAsText(resultPath1, 
FileSystem.WriteMode.OVERWRITE);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31d3eaa7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
index c09afee..d9cc607 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
@@ -92,7 +92,7 @@ public class ProjectTest implements Serializable {
 
                StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORY_SIZE);
 
-               env.generateParallelSequence(1, 10).map(new MapFunction<Long, 
Tuple3<Long, Character, Double>>() {
+               env.generateSequence(1, 10).map(new MapFunction<Long, 
Tuple3<Long, Character, Double>>() {
                                @Override
                                public Tuple3<Long, Character, Double> map(Long 
value) throws Exception {
                                        return new Tuple3<Long, Character, 
Double>(value, 'c', value.doubleValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/31d3eaa7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
index 9085034..f45125b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
@@ -158,7 +158,7 @@ public class StreamVertexTest {
                StreamExecutionEnvironment env = new 
TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
 
                DataStream<String> fromStringElements = env.fromElements("aa", 
"bb", "cc");
-               DataStream<Long> generatedSequence = 
env.generateParallelSequence(0, 3);
+               DataStream<Long> generatedSequence = env.generateSequence(0, 3);
 
                fromStringElements.connect(generatedSequence).map(new 
CoMap()).addSink(new SetSink());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31d3eaa7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
deleted file mode 100644
index 5bf3b61..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class MockSource<T> {
-
-       public static <T> List<T> createAndExecute(SourceFunction<T> 
sourceFunction) throws Exception {
-               List<T> outputs = new ArrayList<T>();
-               if (sourceFunction instanceof RichSourceFunction) {
-                       ((RichSourceFunction<T>) sourceFunction).open(new 
Configuration());
-               }
-               try {
-                       final Collector<T> collector = new 
MockOutput<T>(outputs);
-                       final Object lockObject = new Object();
-                       SourceFunction.SourceContext<T> ctx = new 
SourceFunction.SourceContext<T>() {
-                               @Override
-                               public void collect(T element) {
-                                       collector.collect(element);
-                               }
-
-                               @Override
-                               public Object getCheckpointLock() {
-                                       return lockObject;
-                               }
-                       };
-                       sourceFunction.run(ctx);
-               } catch (Exception e) {
-                       throw new RuntimeException("Cannot invoke source.", e);
-               }
-               return outputs;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/31d3eaa7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
new file mode 100644
index 0000000..5cc5346
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+
+public class SourceFunctionUtil<T> {
+
+       public static <T> List<T> runSourceFunction(SourceFunction<T> 
sourceFunction) throws Exception {
+               List<T> outputs = new ArrayList<T>();
+               if (sourceFunction instanceof RichFunction) {
+                       RuntimeContext runtimeContext =  new 
StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new 
MockInputSplitProvider(), 1024), null,
+                                       new ExecutionConfig());
+                       ((RichFunction) 
sourceFunction).setRuntimeContext(runtimeContext);
+
+                       ((RichFunction) sourceFunction).open(new 
Configuration());
+               }
+               try {
+                       final Collector<T> collector = new 
MockOutput<T>(outputs);
+                       final Object lockObject = new Object();
+                       SourceFunction.SourceContext<T> ctx = new 
SourceFunction.SourceContext<T>() {
+                               @Override
+                               public void collect(T element) {
+                                       collector.collect(element);
+                               }
+
+                               @Override
+                               public Object getCheckpointLock() {
+                                       return lockObject;
+                               }
+                       };
+                       sourceFunction.run(ctx);
+               } catch (Exception e) {
+                       throw new RuntimeException("Cannot invoke source.", e);
+               }
+               return outputs;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/31d3eaa7/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 008ad6c..7371c91 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -232,22 +232,12 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   // 
--------------------------------------------------------------------------------------------
 
   /**
-   * Creates a new DataStream that contains a sequence of numbers.
-   *
-   * Note that this operation will result in a non-parallel data source, i.e. 
a data source with
-   * a parallelism of one.
+   * Creates a new DataStream that contains a sequence of numbers. This source 
is a parallel source.
+   * If you manually set the parallelism to `1` the emitted elements are in 
order.
    */
   def generateSequence(from: Long, to: Long): DataStream[Long] = {
-    new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
-      asInstanceOf[DataStream[Long]]
-  }
-
-  /**
-   * Creates a new DataStream that contains a sequence of numbers in a 
parallel fashion.
-   */
-  def generateParallelSequence(from: Long, to: Long): DataStream[Long] = {
-    new DataStream[java.lang.Long](javaEnv.generateParallelSequence(from, to)).
-      asInstanceOf[DataStream[Long]]
+    new DataStream[java.lang.Long](javaEnv.generateSequence(from, to))
+      .asInstanceOf[DataStream[Long]]
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/31d3eaa7/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 5e348eb..7746bf5 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -228,7 +228,7 @@ class DataStreamTest {
     assert(7 == graph.getStreamNode(windowed.getId).getParallelism)
     assert(7 == graph.getStreamNode(sink.getId).getParallelism)
 
-    val parallelSource = env.generateParallelSequence(0, 0)
+    val parallelSource = env.generateSequence(0, 0)
 
     assert(7 == graph.getStreamNode(parallelSource.getId).getParallelism)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31d3eaa7/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index 626b1d1..f153eb7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FileStateHandle;
@@ -126,7 +127,7 @@ public class ProcessFailureStreamingRecoveryITCase extends 
AbstractProcessFailur
                public void run(SourceContext<Long> sourceCtx) throws Exception 
{
                        final Object checkpointLock = 
sourceCtx.getCheckpointLock();
 
-                       StreamingRuntimeContext runtimeCtx = 
(StreamingRuntimeContext) getRuntimeContext();
+                       RuntimeContext runtimeCtx = getRuntimeContext();
 
                        final long stepSize = 
runtimeCtx.getNumberOfParallelSubtasks();
                        final long congruence = 
runtimeCtx.getIndexOfThisSubtask();

Reply via email to