[scala] [streaming] Added SplitDataStream functionality
Conflicts:
flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4c4f26e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4c4f26e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4c4f26e
Branch: refs/heads/release-0.8
Commit: a4c4f26e70088ed5cd5bf8a8f01585f09674cf04
Parents: e0ab489
Author: mbalassi <[email protected]>
Authored: Mon Dec 15 14:15:35 2014 +0100
Committer: mbalassi <[email protected]>
Committed: Mon Jan 5 17:53:28 2015 +0100
----------------------------------------------------------------------
.../api/datastream/SplitDataStream.java | 4 +-
.../flink/api/scala/streaming/DataStream.scala | 33 ++++++++++++-
.../api/scala/streaming/SplitDataStream.scala | 49 ++++++++++++++++++++
3 files changed, 83 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a4c4f26e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 5a8f038..4fac04c 100755
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -70,9 +70,9 @@ public class SplitDataStream<OUT> {
return returnStream;
}
- private DataStream<OUT> selectOutput(String[] outputName) {
+ private DataStream<OUT> selectOutput(String[] outputNames) {
DataStream<OUT> returnStream = dataStream.copy();
- returnStream.userDefinedNames = Arrays.asList(outputName);
+ returnStream.userDefinedNames = Arrays.asList(outputNames);
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a4c4f26e/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index 69b8359..f0932d8 100644
---
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -41,7 +41,9 @@ import
org.apache.flink.streaming.api.function.sink.SinkFunction
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy
+import org.apache.flink.streaming.api.collector.OutputSelector
import scala.collection.JavaConversions._
+import java.util.HashMap
class DataStream[T](javaStream: JavaStream[T]) {
@@ -423,7 +425,36 @@ class DataStream[T](javaStream: JavaStream[T]) {
def window(triggers: List[TriggerPolicy[T]], evicters:
List[EvictionPolicy[T]]): WindowedDataStream[T] = new
WindowedDataStream[T](javaStream.window(triggers, evicters))
/**
- * >>>>>>> 12178aa... [scala] [streaming] Windowing functionality added to
scala api
+ *
+ * Operator used for directing tuples to specific named outputs using an
+ * OutputSelector. Calling this method on an operator creates a new
+ * SplitDataStream.
+ */
+ def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream
match {
+ case op: SingleOutputStreamOperator[_, _] => new
SplitDataStream[T](op.split(selector))
+ case _ =>
+ throw new UnsupportedOperationException("Operator " +
javaStream.toString + " can not be " +
+ "split.")
+ }
+
+ /**
+ * Creates a new SplitDataStream that contains only the elements satisfying
the
+ * given output selector predicate.
+ */
+ def split(fun: T => TraversableOnce[String]): SplitDataStream[T] = {
+ if (fun == null) {
+ throw new NullPointerException("OutputSelector must not be null.")
+ }
+ val selector = new OutputSelector[T] {
+ val cleanFun = clean(fun)
+ def select(in: T): java.lang.Iterable[String] = {
+ asJavaIterable(cleanFun(in).toIterable)
+ }
+ }
+ split(selector)
+ }
+
+ /**
* Writes a DataStream to the standard output stream (stdout). For each
* element of the DataStream the result of .toString is
* written.
http://git-wip-us.apache.org/repos/asf/flink/blob/a4c4f26e/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
----------------------------------------------------------------------
diff --git
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
new file mode 100644
index 0000000..0b0cce5
--- /dev/null
+++
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.{ SplitDataStream =>
SplitJavaStream }
+
+/**
+ * The SplitDataStream represents an operator that has been split using an
+ * {@link OutputSelector}. Named outputs can be selected using the
+ * {@link #select} function.
+ *
+ * @param <OUT>
+ * The type of the output.
+ */
+class SplitDataStream[T](javaStream: SplitJavaStream[T]) {
+
+ /**
+ * Gets the underlying java DataStream object.
+ */
+ private[flink] def getJavaStream: SplitJavaStream[T] = javaStream
+
+ /**
+ * Sets the output names for which the next operator will receive values.
+ */
+ def select(outputNames: String*): DataStream[T] =
+ new DataStream[T](javaStream.select(outputNames: _*))
+
+ /**
+ * Selects all output names from a split data stream.
+ */
+ def selectAll(): DataStream[T] = new DataStream[T](javaStream.selectAll())
+
+}
\ No newline at end of file