[FLINK-2138] [streaming] Added custom partitioning to DataStream

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97d10070
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97d10070
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97d10070

Branch: refs/heads/master
Commit: 97d10070c7ff5986b8e7ee08dcb6a7e74473cd25
Parents: 490076a
Author: Gábor Hermann <[email protected]>
Authored: Fri Jun 26 17:23:36 2015 +0200
Committer: Gyula Fora <[email protected]>
Committed: Sat Jul 11 14:00:56 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java |  4 +-
 .../streaming/api/datastream/DataStream.java    | 70 ++++++++++++++++++--
 .../partitioner/CustomPartitionerWrapper.java   | 57 ++++++++++++++++
 .../runtime/partitioner/StreamPartitioner.java  |  2 +-
 .../streaming/util/keys/KeySelectorUtil.java    | 40 +++++++++++
 .../flink/streaming/api/DataStreamTest.java     | 28 +++++++-
 6 files changed, 193 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index e217e53..d24a350 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -1128,14 +1128,14 @@ public abstract class DataSet<T> {
        
        /**
         * Partitions a DataSet on the key returned by the selector, using a 
custom partitioner.
-        * This method takes the key selector t get the key to partition on, 
and a partitioner that
+        * This method takes the key selector to get the key to partition on, 
and a partitioner that
         * accepts the key type.
         * <p>
         * Note: This method works only on single field keys, i.e. the selector 
cannot return tuples
         * of fields.
         * 
         * @param partitioner The partitioner to assign partitions to keys.
-        * @param keyExtractor The KeyExtractor with which the DataSet is 
hash-partitioned.
+        * @param keyExtractor The KeyExtractor with which the DataSet is 
partitioned.
         * @return The partitioned DataSet.
         * 
         * @see KeySelector

http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index bf0ff23..8fb896e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -64,9 +66,10 @@ import 
org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
@@ -81,7 +84,6 @@ import com.google.common.base.Preconditions;
  * <ul>
  * <li>{@link DataStream#map},</li>
  * <li>{@link DataStream#filter}, or</li>
- * <li>{@link DataStream#sum}.</li>
  * </ul>
  * 
  * @param <OUT>
@@ -451,6 +453,66 @@ public class DataStream<OUT> {
        }
 
        /**
+        * Partitions a tuple DataStream on the specified key fields using a 
custom partitioner.
+        * This method takes the key position to partition on, and a 
partitioner that accepts the key type.
+        * <p>
+        * Note: This method works only on single field keys.
+        *
+        * @param partitioner The partitioner to assign partitions to keys.
+        * @param field The field index on which the DataStream is to 
partitioned.
+        * @return The partitioned DataStream.
+        */
+       public <K> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, 
int field) {
+               Keys.ExpressionKeys<OUT> outExpressionKeys = new 
Keys.ExpressionKeys<OUT>(new int[]{field}, getType());
+               return partitionCustom(partitioner, outExpressionKeys);
+       }
+
+       /**
+        * Partitions a POJO DataStream on the specified key fields using a 
custom partitioner.
+        * This method takes the key expression to partition on, and a 
partitioner that accepts the key type.
+        * <p>
+        * Note: This method works only on single field keys.
+        *
+        * @param partitioner The partitioner to assign partitions to keys.
+        * @param field The field index on which the DataStream is to 
partitioned.
+        * @return The partitioned DataStream.
+        */
+       public <K> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, 
String field) {
+               Keys.ExpressionKeys<OUT> outExpressionKeys = new 
Keys.ExpressionKeys<OUT>(new String[]{field}, getType());
+               return partitionCustom(partitioner, outExpressionKeys);
+       }
+
+
+       /**
+        * Partitions a DataStream on the key returned by the selector, using a 
custom partitioner.
+        * This method takes the key selector to get the key to partition on, 
and a partitioner that
+        * accepts the key type.
+        * <p>
+        * Note: This method works only on single field keys, i.e. the selector 
cannot return tuples
+        * of fields.
+        *
+        * @param partitioner
+        *              The partitioner to assign partitions to keys.
+        * @param keySelector
+        *              The KeySelector with which the DataStream is 
partitioned.
+        * @return The partitioned DataStream.
+        * @see KeySelector
+        */
+       public <K extends Comparable<K>> DataStream<OUT> 
partitionCustom(Partitioner<K> partitioner, KeySelector<OUT, K> keySelector) {
+               return setConnectionType(new CustomPartitionerWrapper<K, 
OUT>(clean(partitioner), clean(keySelector)));
+       }
+
+       //      private helper method for custom partitioning
+       private <K> DataStream<OUT> partitionCustom(Partitioner<K> partitioner, 
Keys<OUT> keys) {
+               KeySelector<OUT, K> keySelector = 
KeySelectorUtil.getSelectorForOneKey(keys, partitioner, getType(), 
getExecutionConfig());
+
+               return setConnectionType(
+                               new CustomPartitionerWrapper<K, OUT>(
+                                               clean(partitioner),
+                                               clean(keySelector)));
+       }
+
+       /**
         * Sets the partitioning of the {@link DataStream} so that the output 
tuples
         * are broadcasted to every parallel instance of the next component.
         *
@@ -530,7 +592,7 @@ public class DataStream<OUT> {
         * iteration head. The user can also use different feedback type than 
the
         * input of the iteration and treat the input and feedback streams as a
         * {@link ConnectedDataStream} be calling
-        * {@link IterativeDataStream#withFeedbackType(TypeInfo)}
+        * {@link IterativeDataStream#withFeedbackType(TypeInformation)}
         * <p>
         * A common usage pattern for streaming iterations is to use output
         * splitting to send a part of the closing data stream to the head. 
Refer to
@@ -561,7 +623,7 @@ public class DataStream<OUT> {
         * iteration head. The user can also use different feedback type than 
the
         * input of the iteration and treat the input and feedback streams as a
         * {@link ConnectedDataStream} be calling
-        * {@link IterativeDataStream#withFeedbackType(TypeInfo)}
+        * {@link IterativeDataStream#withFeedbackType(TypeInformation)}
         * <p>
         * A common usage pattern for streaming iterations is to use output
         * splitting to send a part of the closing data stream to the head. 
Refer to

http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
new file mode 100644
index 0000000..75867cd
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Partitioner that selects the channel with a user defined partitioner 
function on a key.
+ *
+ * @param <K>
+ *            Type of the key
+ * @param <T>
+ *            Type of the data
+ */
+public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
+       private static final long serialVersionUID = 1L;
+
+       private int[] returnArray = new int[1];
+       Partitioner<K> partitioner;
+       KeySelector<T, K> keySelector;
+
+       public CustomPartitionerWrapper(Partitioner<K> partitioner, 
KeySelector<T, K> keySelector) {
+               super(PartitioningStrategy.CUSTOM);
+               this.partitioner = partitioner;
+               this.keySelector = keySelector;
+       }
+
+       @Override
+       public int[] selectChannels(SerializationDelegate<StreamRecord<T>> 
record,
+                       int numberOfOutputChannels) {
+
+               K key = record.getInstance().getKey(keySelector);
+
+               returnArray[0] = partitioner.partition(key,
+                               numberOfOutputChannels);
+
+               return returnArray;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
index 3af7c7a..ef598c6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
@@ -27,7 +27,7 @@ public abstract class StreamPartitioner<T> implements
 
        public enum PartitioningStrategy {
 
-               FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY
+               FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY, CUSTOM
 
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index 77467b5..49f2fe0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.util.keys;
 import java.lang.reflect.Array;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -71,6 +72,45 @@ public class KeySelectorUtil {
                return new ComparableKeySelector<X>(comparator, keyLength);
        }
 
+       public static <X, K> KeySelector<X, K> getSelectorForOneKey(Keys<X> 
keys, Partitioner<K> partitioner, TypeInformation<X> typeInfo,
+                       ExecutionConfig executionConfig) {
+               if (partitioner != null) {
+                       keys.validateCustomPartitioner(partitioner, null);
+               }
+
+               int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
+
+               if (logicalKeyPositions.length != 1) {
+                       throw new IllegalArgumentException("There must be 
exactly 1 key specified");
+               }
+
+               TypeComparator<X> comparator = ((CompositeType<X>) 
typeInfo).createComparator(
+                               logicalKeyPositions, new boolean[1], 0, 
executionConfig);
+               return new OneKeySelector<X, K>(comparator);
+       }
+
+       public static class OneKeySelector<IN, K> implements KeySelector<IN, K> 
{
+
+               private static final long serialVersionUID = 1L;
+
+               private TypeComparator<IN> comparator;
+               private Object[] keyArray;
+               private K key;
+
+               public OneKeySelector(TypeComparator<IN> comparator) {
+                       this.comparator = comparator;
+                       keyArray = new Object[1];
+               }
+
+               @Override
+               public K getKey(IN value) throws Exception {
+                       comparator.extractKeys(value, keyArray, 0);
+                       key = (K) keyArray[0];
+                       return key;
+               }
+
+       }
+
        public static class ComparableKeySelector<IN> implements 
KeySelector<IN, Tuple> {
 
                private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/97d10070/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index f3b98b2..764c6f2 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -52,6 +53,7 @@ import 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
 import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
@@ -64,7 +66,7 @@ import org.junit.Test;
 public class DataStreamTest {
 
        private static final long MEMORYSIZE = 32;
-       private static int PARALLELISM = 1;
+       private static int PARALLELISM = 2;
 
        /**
         * Tests {@link SingleOutputStreamOperator#name(String)} functionality.
@@ -167,6 +169,26 @@ public class DataStreamTest {
                assertFalse(isGrouped(partition2));
                assertFalse(isGrouped(partition4));
 
+               // Testing DataStream custom partitioning
+               Partitioner<Long> longPartitioner = new Partitioner<Long>() {
+                       @Override
+                       public int partition(Long key, int numPartitions) {
+                               return 100;
+                       }
+               };
+
+               DataStream customPartition1 = 
src1.partitionCustom(longPartitioner, 0);
+               DataStream customPartition3 = 
src1.partitionCustom(longPartitioner, "f0");
+               DataStream customPartition4 = 
src1.partitionCustom(longPartitioner, new FirstSelector());
+
+               
assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition1.getId(), 
createDownStreamId(customPartition1))));
+               
assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition3.getId(), 
createDownStreamId(customPartition3))));
+               
assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition4.getId(), 
createDownStreamId(customPartition4))));
+
+               assertFalse(isGrouped(customPartition1));
+               assertFalse(isGrouped(customPartition3));
+               assertFalse(isGrouped(customPartition4));
+
                //Testing ConnectedDataStream grouping
                ConnectedDataStream connectedGroup1 = connected.groupBy(0, 0);
                Integer downStreamId1 = createDownStreamId(connectedGroup1);
@@ -524,6 +546,10 @@ public class DataStreamTest {
                return edge.getPartitioner() instanceof FieldsPartitioner;
        }
 
+       private static boolean isCustomPartitioned(StreamEdge edge) {
+               return edge.getPartitioner() instanceof 
CustomPartitionerWrapper;
+       }
+
        private static class FirstSelector implements KeySelector<Tuple2<Long, 
Long>, Long> {
                @Override
                public Long getKey(Tuple2<Long, Long> value) throws Exception {

Reply via email to