Repository: flink Updated Branches: refs/heads/master 490076a97 -> 3f3aeb7e0
[FLINK-2138] [streaming] Added custom partitioning to scala DataStream Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc8d7c47 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc8d7c47 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc8d7c47 Branch: refs/heads/master Commit: bc8d7c47e06a91c722906cc753924f1a89c1ed00 Parents: 97d1007 Author: Gábor Hermann <[email protected]> Authored: Fri Jun 26 17:56:16 2015 +0200 Committer: Gyula Fora <[email protected]> Committed: Sat Jul 11 14:00:56 2015 +0200 ---------------------------------------------------------------------- .../org/apache/flink/api/scala/DataSet.scala | 2 +- .../streaming/api/datastream/DataStream.java | 3 +- .../flink/streaming/api/scala/DataStream.scala | 54 +++++++++++++++++--- .../streaming/api/scala/DataStreamTest.scala | 39 +++++++++++++- 4 files changed, 85 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d7c47/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index b14c9c2..fd1492a 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -1197,7 +1197,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { /** * Partitions a DataSet on the key returned by the selector, using a custom partitioner. - * This method takes the key selector t get the key to partition on, and a partitioner that + * This method takes the key selector to get the key to partition on, and a partitioner that * accepts the key type. * <p> * Note: This method works only on single field keys, i.e. the selector cannot return tuples http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d7c47/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 8fb896e..c9c1f49 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -26,7 +26,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -498,7 +497,7 @@ public class DataStream<OUT> { * @return The partitioned DataStream. * @see KeySelector */ - public <K extends Comparable<K>> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, KeySelector<OUT, K> keySelector) { + public <K> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, KeySelector<OUT, K> keySelector) { return setConnectionType(new CustomPartitionerWrapper<K, OUT>(clean(partitioner), clean(keySelector))); } http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d7c47/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index d0441a9..fbd6502 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -18,22 +18,23 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.api.common.io.OutputFormat -import org.apache.flink.api.scala.ClosureCleaner -import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat -import org.apache.flink.core.fs.{FileSystem, Path} - import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, FoldFunction, MapFunction, ReduceFunction} +import org.apache.flink.api.common.functions.{ReduceFunction, FlatMapFunction, MapFunction, + Partitioner, FoldFunction, FilterFunction} +import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat +import org.apache.flink.core.fs.{FileSystem, Path} import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator} +import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, + GroupedDataStream, SingleOutputStreamOperator} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType -import org.apache.flink.streaming.api.functions.sink.{FileSinkFunctionByMillis, SinkFunction} import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} +import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.operators.{StreamGroupedReduce, StreamReduce} import org.apache.flink.streaming.api.windowing.helper.WindowingHelper import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy} @@ -289,6 +290,43 @@ class DataStream[T](javaStream: JavaStream[T]) { } /** + * Partitions a tuple DataStream on the specified key fields using a custom partitioner. + * This method takes the key position to partition on, and a partitioner that accepts the key + * type. + * <p> + * Note: This method works only on single field keys. + */ + def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T] = + javaStream.partitionCustom(partitioner, field) + + /** + * Partitions a POJO DataStream on the specified key fields using a custom partitioner. + * This method takes the key expression to partition on, and a partitioner that accepts the key + * type. + * <p> + * Note: This method works only on single field keys. + */ + def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String) + : DataStream[T] = javaStream.partitionCustom(partitioner, field) + + /** + * Partitions a DataStream on the key returned by the selector, using a custom partitioner. + * This method takes the key selector to get the key to partition on, and a partitioner that + * accepts the key type. + * <p> + * Note: This method works only on single field keys, i.e. the selector cannot return tuples + * of fields. + */ + def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K) + : DataStream[T] = { + val cleanFun = clean(fun) + val keyExtractor = new KeySelector[T, K] { + def getKey(in: T) = cleanFun(in) + } + javaStream.partitionCustom(partitioner, keyExtractor) + } + + /** * Sets the partitioning of the DataStream so that the output tuples * are broad casted to every parallel instance of the next component. This * setting only effects the how the outputs will be distributed between the @@ -296,7 +334,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def broadcast: DataStream[T] = javaStream.broadcast() - + /** * Sets the partitioning of the DataStream so that the output values all go to * the first instance of the next processing operator. Use this setting with care http://git-wip-us.apache.org/repos/asf/flink/blob/bc8d7c47/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index aa1c219..5d44e6b 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -20,11 +20,12 @@ package org.apache.flink.streaming.api.scala import java.lang -import org.apache.flink.api.common.functions._ +import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, + Partitioner, FoldFunction, Function} import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.functions.co.CoMapFunction -import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph, StreamNode} +import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph} import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator} import org.apache.flink.streaming.api.windowing.helper.Count import org.apache.flink.streaming.runtime.partitioner._ @@ -105,6 +106,36 @@ class DataStreamTest { assert(isPartitioned(graph.getStreamEdge(group3.getId, createDownStreamId(group3)))) assert(isPartitioned(graph.getStreamEdge(group4.getId, createDownStreamId(group4)))) + //Testing DataStream partitioning + val partition1: DataStream[_] = src1.partitionByHash(0) + val partition2: DataStream[_] = src1.partitionByHash(1, 0) + val partition3: DataStream[_] = src1.partitionByHash("_1") + val partition4: DataStream[_] = src1.partitionByHash((x : (Long, Long)) => x._1); + + assert(isPartitioned(graph.getStreamEdge(partition1.getId, createDownStreamId(partition1)))) + assert(isPartitioned(graph.getStreamEdge(partition2.getId, createDownStreamId(partition2)))) + assert(isPartitioned(graph.getStreamEdge(partition3.getId, createDownStreamId(partition3)))) + assert(isPartitioned(graph.getStreamEdge(partition4.getId, createDownStreamId(partition4)))) + + // Testing DataStream custom partitioning + val longPartitioner: Partitioner[Long] = new Partitioner[Long] { + override def partition(key: Long, numPartitions: Int): Int = 0 + } + + val customPartition1: DataStream[_] = + src1.partitionCustom(longPartitioner, 0) + val customPartition3: DataStream[_] = + src1.partitionCustom(longPartitioner, "_1") + val customPartition4: DataStream[_] = + src1.partitionCustom(longPartitioner, (x : (Long, Long)) => x._1) + + assert(isCustomPartitioned( + graph.getStreamEdge(customPartition1.getId, createDownStreamId(customPartition1)))) + assert(isCustomPartitioned( + graph.getStreamEdge(customPartition3.getId, createDownStreamId(customPartition3)))) + assert(isCustomPartitioned( + graph.getStreamEdge(customPartition4.getId, createDownStreamId(customPartition4)))) + //Testing ConnectedDataStream grouping val connectedGroup1: ConnectedDataStream[_, _] = connected.groupBy(0, 0) val downStreamId1: Integer = createDownStreamId(connectedGroup1) @@ -465,6 +496,10 @@ class DataStreamTest { return edge.getPartitioner.isInstanceOf[FieldsPartitioner[_]] } + private def isCustomPartitioned(edge: StreamEdge): Boolean = { + return edge.getPartitioner.isInstanceOf[CustomPartitionerWrapper[_, _]] + } + private def createDownStreamId(dataStream: DataStream[_]): Integer = { return dataStream.print.getId }
