[scala] [streaming] Added SplitDataStream functionality

Conflicts:
        
flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7aa68298
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7aa68298
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7aa68298

Branch: refs/heads/master
Commit: 7aa682982b980dd80955c243797d97f2c083ae7c
Parents: 80393c4
Author: mbalassi <[email protected]>
Authored: Mon Dec 15 14:15:35 2014 +0100
Committer: Gyula Fora <[email protected]>
Committed: Fri Jan 2 18:34:38 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/SplitDataStream.java         |  4 +-
 .../flink/api/scala/streaming/DataStream.scala  | 33 ++++++++++++-
 .../api/scala/streaming/SplitDataStream.scala   | 49 ++++++++++++++++++++
 3 files changed, 83 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7aa68298/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 5a8f038..4fac04c 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -70,9 +70,9 @@ public class SplitDataStream<OUT> {
                return returnStream;
        }
 
-       private DataStream<OUT> selectOutput(String[] outputName) {
+       private DataStream<OUT> selectOutput(String[] outputNames) {
                DataStream<OUT> returnStream = dataStream.copy();
-               returnStream.userDefinedNames = Arrays.asList(outputName);
+               returnStream.userDefinedNames = Arrays.asList(outputNames);
                return returnStream;
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7aa68298/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index 69b8359..f0932d8 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -41,7 +41,9 @@ import 
org.apache.flink.streaming.api.function.sink.SinkFunction
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy
+import org.apache.flink.streaming.api.collector.OutputSelector
 import scala.collection.JavaConversions._
+import java.util.HashMap
 
 class DataStream[T](javaStream: JavaStream[T]) {
 
@@ -423,7 +425,36 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def window(triggers: List[TriggerPolicy[T]], evicters: 
List[EvictionPolicy[T]]): WindowedDataStream[T] = new 
WindowedDataStream[T](javaStream.window(triggers, evicters))
 
   /**
-   * >>>>>>> 12178aa... [scala] [streaming] Windowing functionality added to 
scala api
+   *
+   * Operator used for directing tuples to specific named outputs using an
+   * OutputSelector. Calling this method on an operator creates a new
+   * SplitDataStream.
+   */
+  def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream 
match {
+    case op: SingleOutputStreamOperator[_, _] => new 
SplitDataStream[T](op.split(selector))
+    case _ =>
+      throw new UnsupportedOperationException("Operator " + 
javaStream.toString + " can not be " +
+        "split.")
+  }
+
+  /**
+   * Creates a new SplitDataStream that contains only the elements satisfying 
the
+   *  given output selector predicate.
+   */
+  def split(fun: T => TraversableOnce[String]): SplitDataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("OutputSelector must not be null.")
+    }
+    val selector = new OutputSelector[T] {
+      val cleanFun = clean(fun)
+      def select(in: T): java.lang.Iterable[String] = {
+        asJavaIterable(cleanFun(in).toIterable)
+      }
+    }
+    split(selector)
+  }
+
+  /**
    * Writes a DataStream to the standard output stream (stdout). For each
    * element of the DataStream the result of .toString is
    * written.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7aa68298/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
new file mode 100644
index 0000000..0b0cce5
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.{ SplitDataStream => 
SplitJavaStream }
+
+/**
+ * The SplitDataStream represents an operator that has been split using an
+ * {@link OutputSelector}. Named outputs can be selected using the
+ * {@link #select} function.
+ *
+ * @param <OUT>
+ *            The type of the output.
+ */
+class SplitDataStream[T](javaStream: SplitJavaStream[T]) {
+
+  /**
+   * Gets the underlying java DataStream object.
+   */
+  private[flink] def getJavaStream: SplitJavaStream[T] = javaStream
+
+  /**
+   *  Sets the output names for which the next operator will receive values.
+   */
+  def select(outputNames: String*): DataStream[T] =
+    new DataStream[T](javaStream.select(outputNames: _*))
+
+  /**
+   * Selects all output names from a split data stream.
+   */
+  def selectAll(): DataStream[T] = new DataStream[T](javaStream.selectAll())
+
+}
\ No newline at end of file

Reply via email to