[scala] [streaming] Added support for iterative streams for scala api

Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f165c353
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f165c353
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f165c353

Branch: refs/heads/master
Commit: f165c353c6b41396eaecb11b6cda274d818cb553
Parents: 7aa6829
Author: Gyula Fora <[email protected]>
Authored: Thu Dec 18 22:46:00 2014 +0100
Committer: Gyula Fora <[email protected]>
Committed: Fri Jan 2 18:34:38 2015 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         |  4 +-
 .../streaming/api/datastream/DataStream.java    | 38 ++++++++--
 .../api/datastream/IterativeDataStream.java     | 18 +----
 .../apache/flink/streaming/api/IterateTest.java |  2 +-
 .../examples/iteration/IterateExample.java      |  7 +-
 .../flink/api/scala/streaming/DataStream.scala  | 74 ++++++++------------
 .../streaming/StreamExecutionEnvironment.scala  |  9 +--
 7 files changed, 73 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f165c353/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index c51afbc..bea0907 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -509,7 +509,7 @@ Unlike in the core API the user does not define the maximum 
number of iterations
 To start an iterative part of the program the user defines the iteration 
starting point:
 
 ~~~java
-IterativeDataStream<Integer> iteration = source.iterate();
+IterativeDataStream<Integer> iteration = source.iterate(maxWaitTimeMillis);
 ~~~
 The operator applied on the iteration starting point is the head of the 
iteration, where data is fed back from the iteration tail.
 
@@ -529,7 +529,7 @@ iteration.closeWith(tailOperator.select("iterate"));
 In these case all output directed to the “iterate” edge would be fed back 
to the iteration head.
 
 Because iterative streaming programs do not have a set number of iterations 
for each data element, the streaming program has no information on the end of 
its input. From this it follows that iterative streaming programs run until the 
user manually stops the program. While this is acceptable under normal 
circumstances a method is provided to allow iterative programs to shut down 
automatically if no input received by the iteration head for a predefined 
number of milliseconds.
-To use this function the user needs to call, the 
`iteration.setMaxWaitTime(millis)` to control the max wait time. 
+To use this functionality the user needs to add the maxWaitTimeMillis 
parameter to the `dataStream.iterate(…)` call to control the max wait time. 
 
 ### Rich functions
 The usage of rich functions are essentially the same as in the core Flink API. 
All transformations that take as argument a user-defined function can instead 
take a rich function as argument:

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f165c353/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 3a3cdc4..a236312 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -383,14 +383,44 @@ public class DataStream<OUT> {
         * the iteration head.
         * <p>
         * By default a DataStream with iteration will never terminate, but the 
user
-        * can use the {@link IterativeDataStream#setMaxWaitTime} call to set a 
max
-        * waiting time for the iteration head. If no data received in the set 
time,
-        * the stream terminates.
+        * can use the the maxWaitTime parameter to set a max waiting time for 
the
+        * iteration head. If no data received in the set time, the stream
+        * terminates.
         * 
         * @return The iterative data stream created.
         */
        public IterativeDataStream<OUT> iterate() {
-               return new IterativeDataStream<OUT>(this);
+               return new IterativeDataStream<OUT>(this, 0);
+       }
+
+       /**
+        * Initiates an iterative part of the program that feeds back data 
streams.
+        * The iterative part needs to be closed by calling
+        * {@link IterativeDataStream#closeWith(DataStream)}. The 
transformation of
+        * this IterativeDataStream will be the iteration head. The data stream
+        * given to the {@link IterativeDataStream#closeWith(DataStream)} 
method is
+        * the data stream that will be fed back and used as the input for the
+        * iteration head. A common usage pattern for streaming iterations is 
to use
+        * output splitting to send a part of the closing data stream to the 
head.
+        * Refer to {@link SingleOutputStreamOperator#split(OutputSelector)} for
+        * more information.
+        * <p>
+        * The iteration edge will be partitioned the same way as the first 
input of
+        * the iteration head.
+        * <p>
+        * By default a DataStream with iteration will never terminate, but the 
user
+        * can use the the maxWaitTime parameter to set a max waiting time for 
the
+        * iteration head. If no data received in the set time, the stream
+        * terminates.
+        * 
+        * @param maxWaitTimeMillis
+        *            Number of milliseconds to wait between inputs before 
shutting
+        *            down
+        * 
+        * @return The iterative data stream created.
+        */
+       public IterativeDataStream<OUT> iterate(long maxWaitTimeMillis) {
+               return new IterativeDataStream<OUT>(this, maxWaitTimeMillis);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f165c353/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index d8497ae..78518c0 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -31,12 +31,12 @@ public class IterativeDataStream<IN> extends
        protected Integer iterationID;
        protected long waitTime;
 
-       protected IterativeDataStream(DataStream<IN> dataStream) {
+       protected IterativeDataStream(DataStream<IN> dataStream, long 
maxWaitTime) {
                super(dataStream);
                setBufferTimeout(dataStream.environment.getBufferTimeout());
                iterationID = iterationCount;
                iterationCount++;
-               waitTime = 0;
+               waitTime = maxWaitTime;
        }
 
        protected IterativeDataStream(DataStream<IN> dataStream, Integer 
iterationID, long waitTime) {
@@ -71,20 +71,6 @@ public class IterativeDataStream<IN> extends
                return iterationTail;
        }
 
-       /**
-        * Sets the max waiting time for the next record before shutting down 
the
-        * stream. If not set, then the user needs to manually kill the process 
to
-        * stop.
-        * 
-        * @param waitTimeMillis
-        *            Max waiting time in milliseconds
-        * @return The modified DataStream.
-        */
-       public IterativeDataStream<IN> setMaxWaitTime(long waitTimeMillis) {
-               this.waitTime = waitTimeMillis;
-               return this;
-       }
-
        @Override
        protected IterativeDataStream<IN> copy() {
                return new IterativeDataStream<IN>(this, iterationID, waitTime);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f165c353/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index c1c0c6d..6ad827a 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -79,7 +79,7 @@ public class IterateTest {
 
                DataStream<Boolean> source = env.fromElements(false, false, 
false);
 
-               IterativeDataStream<Boolean> iteration = 
source.iterate().setMaxWaitTime(3000);
+               IterativeDataStream<Boolean> iteration = source.iterate(3000);
 
                DataStream<Boolean> increment = iteration.flatMap(new 
IterationHead()).flatMap(
                                new IterationTail());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f165c353/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 4a51c41..a5dc68a 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -70,12 +70,9 @@ public class IterateExample {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
                                .setBufferTimeout(1);
 
-               // create an iterative data stream from the input
+               // create an iterative data stream from the input with 5 second 
timeout
                IterativeDataStream<Tuple2<Double, Integer>> it = 
env.fromCollection(input).shuffle()
-                               .iterate();
-
-               // trigger iteration termination if no new data received for 5 
seconds
-               it.setMaxWaitTime(5000);
+                               .iterate(5000);
 
                // apply the step function to add new random value to the tuple 
and to
                // increment the counter and split the output with the output 
selector

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f165c353/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index f0932d8..5f46c84 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -68,7 +68,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   /**
    * Gets the underlying java DataStream object.
    */
-  private[flink] def getJavaStream: JavaStream[T] = javaStream
+  def getJavaStream: JavaStream[T] = javaStream
 
   /**
    * Sets the degree of parallelism of this operation. This must be greater 
than 1.
@@ -200,6 +200,32 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def distribute: DataStream[T] = new DataStream[T](javaStream.distribute())
 
   /**
+   * Initiates an iterative part of the program that creates a loop by feeding
+   * back data streams. To create a streaming iteration the user needs to 
define
+   * a transformation that creates two DataStreams.The first one one is the 
output
+   * that will be fed back to the start of the iteration and the second is the 
output
+   * stream of the iterative part.
+   * <p>
+   * stepfunction: initialStream => (feedback, output)
+   * <p>
+   * A common pattern is to use output splitting to create feedback and output 
DataStream.
+   * Please refer to the .split(...) method of the DataStream
+   * <p>
+   * By default a DataStream with iteration will never terminate, but the user
+   * can use the maxWaitTime parameter to set a max waiting time for the 
iteration head.
+   * If no data received in the set time the stream terminates.
+   *
+   *
+   */
+  def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]), 
maxWaitTimeMillis: Long = 0): DataStream[T] = {
+    val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
+
+    val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
+    iterativeStream.closeWith(feedback.getJavaStream)
+    new DataStream[T](output.getJavaStream)
+  }
+
+  /**
    * Applies an aggregation that that gives the current maximum of the data 
stream at
    * the given position.
    *
@@ -232,33 +258,11 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
-   * Applies an aggregation that that gives the current maximum element of the 
data stream by
-   * the given position. When equality, returns the first.
-   *
-   */
-  def maxBy(field: Any): DataStream[T] = field match {
-    case field: Int => return new DataStream[T](javaStream.maxBy(field))
-    case field: String => return new DataStream[T](javaStream.maxBy(field))
-    case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
-  }
-
-  /**
-   * Applies an aggregation that that gives the current minimum element of the 
data stream by
-   * the given position. When equality, returns the first.
-   *
-   */
-  def minBy(field: Any): DataStream[T] = field match {
-    case field: Int => return new DataStream[T](javaStream.minBy(field))
-    case field: String => return new DataStream[T](javaStream.minBy(field))
-    case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
-  }
-
-  /**
    * Applies an aggregation that that gives the current minimum element of the 
data stream by
    * the given position. When equality, the user can set to get the first or 
last element with the minimal value.
    *
    */
-  def minBy(field: Any, first: Boolean): DataStream[T] = field match {
+  def minBy(field: Any, first: Boolean = true): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.minBy(field, first))
     case field: String => return new DataStream[T](javaStream.minBy(field, 
first))
     case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
@@ -269,7 +273,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * the given position. When equality, the user can set to get the first or 
last element with the maximal value.
    *
    */
-  def maxBy(field: Any, first: Boolean): DataStream[T] = field match {
+  def maxBy(field: Any, first: Boolean = true): DataStream[T] = field match {
     case field: Int => return new DataStream[T](javaStream.maxBy(field, first))
     case field: String => return new DataStream[T](javaStream.maxBy(field, 
first))
     case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
@@ -469,15 +473,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * is written.
    *
    */
-  def writeAsText(path: String, millis: Long): DataStream[T] = new 
DataStream[T](javaStream.writeAsText(path, millis))
-
-  /**
-   * Writes a DataStream to the file specified by path in text format.
-   * For every element of the DataStream the result of .toString
-   * is written.
-   *
-   */
-  def writeAsText(path: String): DataStream[T] = new 
DataStream[T](javaStream.writeAsText(path))
+  def writeAsText(path: String, millis: Long = 0): DataStream[T] = new 
DataStream[T](javaStream.writeAsText(path, millis))
 
   /**
    * Writes a DataStream to the file specified by path in text format. The
@@ -486,15 +482,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * is written.
    *
    */
-  def writeAsCsv(path: String, millis: Long): DataStream[T] = new 
DataStream[T](javaStream.writeAsCsv(path, millis))
-
-  /**
-   * Writes a DataStream to the file specified by path in text format.
-   * For every element of the DataStream the result of .toString
-   * is written.
-   *
-   */
-  def writeAsCsv(path: String): DataStream[T] = new 
DataStream[T](javaStream.writeAsCsv(path))
+  def writeAsCsv(path: String, millis: Long = 0): DataStream[T] = new 
DataStream[T](javaStream.writeAsCsv(path, millis))
 
   /**
    * Adds the given sink to this DataStream. Only streams with sinks added

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f165c353/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
index dadfde2..340ecc1 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
@@ -117,14 +117,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    *
    */
   def generateSequence(from: Long, to: Long): DataStream[Long] = {
-    val source = new SourceFunction[Long] {
-      override def invoke(out: Collector[Long]) = {
-        for (i <- from.to(to)) {
-          out.collect(i)
-        }
-      }
-    }
-    addSource(source)
+    new DataStream[java.lang.Long](javaEnv.generateSequence(from, 
to)).asInstanceOf[DataStream[Long]]
   }
 
   /**

Reply via email to