Repository: flink
Updated Branches:
  refs/heads/master 490076a97 -> 3f3aeb7e0


[FLINK-2138] [streaming] Added custom partitioning to scala DataStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc8d7c47
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc8d7c47
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc8d7c47

Branch: refs/heads/master
Commit: bc8d7c47e06a91c722906cc753924f1a89c1ed00
Parents: 97d1007
Author: Gábor Hermann <[email protected]>
Authored: Fri Jun 26 17:56:16 2015 +0200
Committer: Gyula Fora <[email protected]>
Committed: Sat Jul 11 14:00:56 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/api/scala/DataSet.scala    |  2 +-
 .../streaming/api/datastream/DataStream.java    |  3 +-
 .../flink/streaming/api/scala/DataStream.scala  | 54 +++++++++++++++++---
 .../streaming/api/scala/DataStreamTest.scala    | 39 +++++++++++++-
 4 files changed, 85 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d7c47/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index b14c9c2..fd1492a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1197,7 +1197,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
 
   /**
    * Partitions a DataSet on the key returned by the selector, using a custom 
partitioner.
-   * This method takes the key selector t get the key to partition on, and a 
partitioner that
+   * This method takes the key selector to get the key to partition on, and a 
partitioner that
    * accepts the key type.
    * <p>
    * Note: This method works only on single field keys, i.e. the selector 
cannot return tuples

http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d7c47/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 8fb896e..c9c1f49 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -498,7 +497,7 @@ public class DataStream<OUT> {
         * @return The partitioned DataStream.
         * @see KeySelector
         */
-       public <K extends Comparable<K>> DataStream<OUT> 
partitionCustom(Partitioner<K> partitioner, KeySelector<OUT, K> keySelector) {
+       public <K> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, 
KeySelector<OUT, K> keySelector) {
                return setConnectionType(new CustomPartitionerWrapper<K, 
OUT>(clean(partitioner), clean(keySelector)));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d7c47/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index d0441a9..fbd6502 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,22 +18,23 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.api.common.io.OutputFormat
-import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
-import org.apache.flink.core.fs.{FileSystem, Path}
-
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
-import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, 
FoldFunction, MapFunction, ReduceFunction}
+import org.apache.flink.api.common.functions.{ReduceFunction, FlatMapFunction, 
MapFunction,
+  Partitioner, FoldFunction, FilterFunction}
+import org.apache.flink.api.common.io.OutputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
+import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, 
DataStreamSink, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, 
DataStreamSink,
+  GroupedDataStream, SingleOutputStreamOperator}
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import 
org.apache.flink.streaming.api.functions.sink.{FileSinkFunctionByMillis, 
SinkFunction}
 import 
org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, 
SumAggregator}
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.operators.{StreamGroupedReduce, 
StreamReduce}
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, 
TriggerPolicy}
@@ -289,6 +290,43 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
+   * Partitions a tuple DataStream on the specified key fields using a custom 
partitioner.
+   * This method takes the key position to partition on, and a partitioner 
that accepts the key
+   * type.
+   * <p>
+   * Note: This method works only on single field keys.
+   */
+  def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: 
Int) : DataStream[T] =
+    javaStream.partitionCustom(partitioner, field)
+
+  /**
+   * Partitions a POJO DataStream on the specified key fields using a custom 
partitioner.
+   * This method takes the key expression to partition on, and a partitioner 
that accepts the key
+   * type.
+   * <p>
+   * Note: This method works only on single field keys.
+   */
+  def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: 
String)
+  : DataStream[T] = javaStream.partitionCustom(partitioner, field)
+
+  /**
+   * Partitions a DataStream on the key returned by the selector, using a 
custom partitioner.
+   * This method takes the key selector to get the key to partition on, and a 
partitioner that
+   * accepts the key type.
+   * <p>
+   * Note: This method works only on single field keys, i.e. the selector 
cannot return tuples
+   * of fields.
+   */
+  def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T 
=> K)
+  : DataStream[T] = {
+    val cleanFun = clean(fun)
+    val keyExtractor = new KeySelector[T, K] {
+      def getKey(in: T) = cleanFun(in)
+    }
+    javaStream.partitionCustom(partitioner, keyExtractor)
+  }
+
+  /**
    * Sets the partitioning of the DataStream so that the output tuples
    * are broad casted to every parallel instance of the next component. This
    * setting only effects the how the outputs will be distributed between the
@@ -296,7 +334,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def broadcast: DataStream[T] = javaStream.broadcast()
-  
+
   /**
    * Sets the partitioning of the DataStream so that the output values all go 
to 
    * the first instance of the next processing operator. Use this setting with 
care

http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d7c47/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index aa1c219..5d44e6b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -20,11 +20,12 @@ package org.apache.flink.streaming.api.scala
 
 import java.lang
 
-import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, 
MapFunction,
+  Partitioner, FoldFunction, Function}
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.functions.co.CoMapFunction
-import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph, 
StreamNode}
+import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
 import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, 
StreamOperator}
 import org.apache.flink.streaming.api.windowing.helper.Count
 import org.apache.flink.streaming.runtime.partitioner._
@@ -105,6 +106,36 @@ class DataStreamTest {
     assert(isPartitioned(graph.getStreamEdge(group3.getId, 
createDownStreamId(group3))))
     assert(isPartitioned(graph.getStreamEdge(group4.getId, 
createDownStreamId(group4))))
 
+    //Testing DataStream partitioning
+    val partition1: DataStream[_] = src1.partitionByHash(0)
+    val partition2: DataStream[_] = src1.partitionByHash(1, 0)
+    val partition3: DataStream[_] = src1.partitionByHash("_1")
+    val partition4: DataStream[_] = src1.partitionByHash((x : (Long, Long)) => 
x._1);
+
+    assert(isPartitioned(graph.getStreamEdge(partition1.getId, 
createDownStreamId(partition1))))
+    assert(isPartitioned(graph.getStreamEdge(partition2.getId, 
createDownStreamId(partition2))))
+    assert(isPartitioned(graph.getStreamEdge(partition3.getId, 
createDownStreamId(partition3))))
+    assert(isPartitioned(graph.getStreamEdge(partition4.getId, 
createDownStreamId(partition4))))
+
+    // Testing DataStream custom partitioning
+    val longPartitioner: Partitioner[Long] = new Partitioner[Long] {
+      override def partition(key: Long, numPartitions: Int): Int = 0
+    }
+
+    val customPartition1: DataStream[_] =
+      src1.partitionCustom(longPartitioner, 0)
+    val customPartition3: DataStream[_] =
+      src1.partitionCustom(longPartitioner, "_1")
+    val customPartition4: DataStream[_] =
+      src1.partitionCustom(longPartitioner, (x : (Long, Long)) => x._1)
+
+    assert(isCustomPartitioned(
+      graph.getStreamEdge(customPartition1.getId, 
createDownStreamId(customPartition1))))
+    assert(isCustomPartitioned(
+      graph.getStreamEdge(customPartition3.getId, 
createDownStreamId(customPartition3))))
+    assert(isCustomPartitioned(
+      graph.getStreamEdge(customPartition4.getId, 
createDownStreamId(customPartition4))))
+
     //Testing ConnectedDataStream grouping
     val connectedGroup1: ConnectedDataStream[_, _] = connected.groupBy(0, 0)
     val downStreamId1: Integer = createDownStreamId(connectedGroup1)
@@ -465,6 +496,10 @@ class DataStreamTest {
     return edge.getPartitioner.isInstanceOf[FieldsPartitioner[_]]
   }
 
+  private def isCustomPartitioned(edge: StreamEdge): Boolean = {
+    return edge.getPartitioner.isInstanceOf[CustomPartitionerWrapper[_, _]]
+  }
+
   private def createDownStreamId(dataStream: DataStream[_]): Integer = {
     return dataStream.print.getId
   }

Reply via email to