[ 
https://issues.apache.org/jira/browse/FLINK-2997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15208217#comment-15208217
 ] 

ASF GitHub Bot commented on FLINK-2997:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r57140198
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
 ---
    @@ -110,52 +118,93 @@ public void mapPartition(Iterable<Tuple3<Integer, 
Integer, String>> values, Coll
        }
     
        /**
    -    * The class is used to do the tests of range partition with customed 
data distribution.
    +    * The class is used to do the tests of range partition with one key.
         */
    -   public static class TestDataDist implements DataDistribution {
    +   public static class TestDataDist1 implements DataDistribution {
     
                private int dim;
     
    -           public TestDataDist() {}
    +           public TestDataDist1() {}
     
                /**
                 * Constructor of the customized distribution for range 
partition.
                 * @param dim the number of the fields.
                 */
    -           public TestDataDist(int dim) {
    +           public TestDataDist1(int dim) {
                        this.dim = dim;
                }
     
                public int getParallelism() {
    -                   if (dim == 1) {
    -                           return 3;
    -                   }
    -                   return 6;
    +                   return 3;
                }
     
                @Override
                public Object[] getBucketBoundary(int bucketNum, int 
totalNumBuckets) {
    -                   if (dim == 1) {
    -                           /*
    -                           for the first test, the boundary is just like : 
    -                           (0, 7]
    -                           (7, 14]
    -                           (14, 21]
    -                            */
    -
    -                           return new Integer[]{(bucketNum + 1) * 7};
    -                   }
    +
                        /*
    -                   for the second test, the boundary is just like : 
    -                   (0, 1], (0, 1]
    -                   (1, 3], (1, 2]
    -                   (3, 6], (2, 3]
    -                   (6, 10], (3, 4]
    -                   (10, 15], (4, 5]
    -                   (15, 21], (5, 6]
    +                   for the first test, the boundary is just like : 
    +                   (0, 7]
    +                   (7, 14]
    +                   (14, 21]
                         */
    +                   return new Integer[]{(bucketNum + 1) * 7};
    +           }
    +
    +           @Override
    +           public int getNumberOfFields() {
    +                   return this.dim;
    +           }
     
    -                   return new Integer[]{(bucketNum + 1) * (bucketNum + 2) 
/ 2, bucketNum + 1};
    +           @Override
    +           public TypeInformation[] getKeyTypes() {
    +                   return new 
TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO};
    +           }
    +
    +           @Override
    +           public void write(DataOutputView out) throws IOException {
    +                   out.writeInt(this.dim);
    +           }
    +
    +           @Override
    +           public void read(DataInputView in) throws IOException {
    +                   this.dim = in.readInt();
    +           }
    +   }
    +
    +   /**
    +    * The class is used to do the tests of range partition with two keys.
    +    */
    +   public static class TestDataDist2 implements DataDistribution {
    +
    +           public int rightBoundary[] = new int[]{6, 4, 9, 1, 2};
    +           private int dim;
    --- End diff --
    
    `dim` should always be `2`. Remove the field, the constructor, and update 
`write()` and `read()`.


> Support range partition with user customized data distribution.
> ---------------------------------------------------------------
>
>                 Key: FLINK-2997
>                 URL: https://issues.apache.org/jira/browse/FLINK-2997
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Chengxiang Li
>
> This is a followup work of FLINK-7, sometime user have better knowledge of 
> the source data, and they can build customized data distribution to do range 
> partition more efficiently.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to