[FLINK-2138] [streaming] Added custom partitioning to DataStream
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97d10070 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97d10070 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97d10070 Branch: refs/heads/master Commit: 97d10070c7ff5986b8e7ee08dcb6a7e74473cd25 Parents: 490076a Author: Gábor Hermann <[email protected]> Authored: Fri Jun 26 17:23:36 2015 +0200 Committer: Gyula Fora <[email protected]> Committed: Sat Jul 11 14:00:56 2015 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/api/java/DataSet.java | 4 +- .../streaming/api/datastream/DataStream.java | 70 ++++++++++++++++++-- .../partitioner/CustomPartitionerWrapper.java | 57 ++++++++++++++++ .../runtime/partitioner/StreamPartitioner.java | 2 +- .../streaming/util/keys/KeySelectorUtil.java | 40 +++++++++++ .../flink/streaming/api/DataStreamTest.java | 28 +++++++- 6 files changed, 193 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index e217e53..d24a350 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -1128,14 +1128,14 @@ public abstract class DataSet<T> { /** * Partitions a DataSet on the key returned by the selector, using a custom partitioner. - * This method takes the key selector t get the key to partition on, and a partitioner that + * This method takes the key selector to get the key to partition on, and a partitioner that * accepts the key type. * <p> * Note: This method works only on single field keys, i.e. the selector cannot return tuples * of fields. * * @param partitioner The partitioner to assign partitions to keys. - * @param keyExtractor The KeyExtractor with which the DataSet is hash-partitioned. + * @param keyExtractor The KeyExtractor with which the DataSet is partitioned. * @return The partitioned DataSet. * * @see KeySelector http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index bf0ff23..8fb896e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -25,6 +25,8 @@ import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -64,9 +66,10 @@ import org.apache.flink.streaming.api.windowing.helper.WindowingHelper; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; +import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper; +import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner; import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner; -import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.util.keys.KeySelectorUtil; @@ -81,7 +84,6 @@ import com.google.common.base.Preconditions; * <ul> * <li>{@link DataStream#map},</li> * <li>{@link DataStream#filter}, or</li> - * <li>{@link DataStream#sum}.</li> * </ul> * * @param <OUT> @@ -451,6 +453,66 @@ public class DataStream<OUT> { } /** + * Partitions a tuple DataStream on the specified key fields using a custom partitioner. + * This method takes the key position to partition on, and a partitioner that accepts the key type. + * <p> + * Note: This method works only on single field keys. + * + * @param partitioner The partitioner to assign partitions to keys. + * @param field The field index on which the DataStream is to partitioned. + * @return The partitioned DataStream. + */ + public <K> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, int field) { + Keys.ExpressionKeys<OUT> outExpressionKeys = new Keys.ExpressionKeys<OUT>(new int[]{field}, getType()); + return partitionCustom(partitioner, outExpressionKeys); + } + + /** + * Partitions a POJO DataStream on the specified key fields using a custom partitioner. + * This method takes the key expression to partition on, and a partitioner that accepts the key type. + * <p> + * Note: This method works only on single field keys. + * + * @param partitioner The partitioner to assign partitions to keys. + * @param field The field index on which the DataStream is to partitioned. + * @return The partitioned DataStream. + */ + public <K> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, String field) { + Keys.ExpressionKeys<OUT> outExpressionKeys = new Keys.ExpressionKeys<OUT>(new String[]{field}, getType()); + return partitionCustom(partitioner, outExpressionKeys); + } + + + /** + * Partitions a DataStream on the key returned by the selector, using a custom partitioner. + * This method takes the key selector to get the key to partition on, and a partitioner that + * accepts the key type. + * <p> + * Note: This method works only on single field keys, i.e. the selector cannot return tuples + * of fields. + * + * @param partitioner + * The partitioner to assign partitions to keys. + * @param keySelector + * The KeySelector with which the DataStream is partitioned. + * @return The partitioned DataStream. + * @see KeySelector + */ + public <K extends Comparable<K>> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, KeySelector<OUT, K> keySelector) { + return setConnectionType(new CustomPartitionerWrapper<K, OUT>(clean(partitioner), clean(keySelector))); + } + + // private helper method for custom partitioning + private <K> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, Keys<OUT> keys) { + KeySelector<OUT, K> keySelector = KeySelectorUtil.getSelectorForOneKey(keys, partitioner, getType(), getExecutionConfig()); + + return setConnectionType( + new CustomPartitionerWrapper<K, OUT>( + clean(partitioner), + clean(keySelector))); + } + + /** * Sets the partitioning of the {@link DataStream} so that the output tuples * are broadcasted to every parallel instance of the next component. * @@ -530,7 +592,7 @@ public class DataStream<OUT> { * iteration head. The user can also use different feedback type than the * input of the iteration and treat the input and feedback streams as a * {@link ConnectedDataStream} be calling - * {@link IterativeDataStream#withFeedbackType(TypeInfo)} + * {@link IterativeDataStream#withFeedbackType(TypeInformation)} * <p> * A common usage pattern for streaming iterations is to use output * splitting to send a part of the closing data stream to the head. Refer to @@ -561,7 +623,7 @@ public class DataStream<OUT> { * iteration head. The user can also use different feedback type than the * input of the iteration and treat the input and feedback streams as a * {@link ConnectedDataStream} be calling - * {@link IterativeDataStream#withFeedbackType(TypeInfo)} + * {@link IterativeDataStream#withFeedbackType(TypeInformation)} * <p> * A common usage pattern for streaming iterations is to use output * splitting to send a part of the closing data stream to the head. Refer to http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java new file mode 100644 index 0000000..75867cd --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.partitioner; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * Partitioner that selects the channel with a user defined partitioner function on a key. + * + * @param <K> + * Type of the key + * @param <T> + * Type of the data + */ +public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> { + private static final long serialVersionUID = 1L; + + private int[] returnArray = new int[1]; + Partitioner<K> partitioner; + KeySelector<T, K> keySelector; + + public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) { + super(PartitioningStrategy.CUSTOM); + this.partitioner = partitioner; + this.keySelector = keySelector; + } + + @Override + public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, + int numberOfOutputChannels) { + + K key = record.getInstance().getKey(keySelector); + + returnArray[0] = partitioner.partition(key, + numberOfOutputChannels); + + return returnArray; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java index 3af7c7a..ef598c6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java @@ -27,7 +27,7 @@ public abstract class StreamPartitioner<T> implements public enum PartitioningStrategy { - FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY + FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY, CUSTOM } http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java index 77467b5..49f2fe0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.util.keys; import java.lang.reflect.Array; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -71,6 +72,45 @@ public class KeySelectorUtil { return new ComparableKeySelector<X>(comparator, keyLength); } + public static <X, K> KeySelector<X, K> getSelectorForOneKey(Keys<X> keys, Partitioner<K> partitioner, TypeInformation<X> typeInfo, + ExecutionConfig executionConfig) { + if (partitioner != null) { + keys.validateCustomPartitioner(partitioner, null); + } + + int[] logicalKeyPositions = keys.computeLogicalKeyPositions(); + + if (logicalKeyPositions.length != 1) { + throw new IllegalArgumentException("There must be exactly 1 key specified"); + } + + TypeComparator<X> comparator = ((CompositeType<X>) typeInfo).createComparator( + logicalKeyPositions, new boolean[1], 0, executionConfig); + return new OneKeySelector<X, K>(comparator); + } + + public static class OneKeySelector<IN, K> implements KeySelector<IN, K> { + + private static final long serialVersionUID = 1L; + + private TypeComparator<IN> comparator; + private Object[] keyArray; + private K key; + + public OneKeySelector(TypeComparator<IN> comparator) { + this.comparator = comparator; + keyArray = new Object[1]; + } + + @Override + public K getKey(IN value) throws Exception { + comparator.extractKeys(value, keyArray, 0); + key = (K) keyArray[0]; + return key; + } + + } + public static class ComparableKeySelector<IN> implements KeySelector<IN, Tuple> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index f3b98b2..764c6f2 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -52,6 +53,7 @@ import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.windowing.helper.Count; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; +import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper; import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner; import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; @@ -64,7 +66,7 @@ import org.junit.Test; public class DataStreamTest { private static final long MEMORYSIZE = 32; - private static int PARALLELISM = 1; + private static int PARALLELISM = 2; /** * Tests {@link SingleOutputStreamOperator#name(String)} functionality. @@ -167,6 +169,26 @@ public class DataStreamTest { assertFalse(isGrouped(partition2)); assertFalse(isGrouped(partition4)); + // Testing DataStream custom partitioning + Partitioner<Long> longPartitioner = new Partitioner<Long>() { + @Override + public int partition(Long key, int numPartitions) { + return 100; + } + }; + + DataStream customPartition1 = src1.partitionCustom(longPartitioner, 0); + DataStream customPartition3 = src1.partitionCustom(longPartitioner, "f0"); + DataStream customPartition4 = src1.partitionCustom(longPartitioner, new FirstSelector()); + + assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition1.getId(), createDownStreamId(customPartition1)))); + assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition3.getId(), createDownStreamId(customPartition3)))); + assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition4.getId(), createDownStreamId(customPartition4)))); + + assertFalse(isGrouped(customPartition1)); + assertFalse(isGrouped(customPartition3)); + assertFalse(isGrouped(customPartition4)); + //Testing ConnectedDataStream grouping ConnectedDataStream connectedGroup1 = connected.groupBy(0, 0); Integer downStreamId1 = createDownStreamId(connectedGroup1); @@ -524,6 +546,10 @@ public class DataStreamTest { return edge.getPartitioner() instanceof FieldsPartitioner; } + private static boolean isCustomPartitioned(StreamEdge edge) { + return edge.getPartitioner() instanceof CustomPartitionerWrapper; + } + private static class FirstSelector implements KeySelector<Tuple2<Long, Long>, Long> { @Override public Long getKey(Tuple2<Long, Long> value) throws Exception {
