[ https://issues.apache.org/jira/browse/FLINK-2997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15208217#comment-15208217 ]
ASF GitHub Bot commented on FLINK-2997: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1776#discussion_r57140198 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java --- @@ -110,52 +118,93 @@ public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Coll } /** - * The class is used to do the tests of range partition with customed data distribution. + * The class is used to do the tests of range partition with one key. */ - public static class TestDataDist implements DataDistribution { + public static class TestDataDist1 implements DataDistribution { private int dim; - public TestDataDist() {} + public TestDataDist1() {} /** * Constructor of the customized distribution for range partition. * @param dim the number of the fields. */ - public TestDataDist(int dim) { + public TestDataDist1(int dim) { this.dim = dim; } public int getParallelism() { - if (dim == 1) { - return 3; - } - return 6; + return 3; } @Override public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) { - if (dim == 1) { - /* - for the first test, the boundary is just like : - (0, 7] - (7, 14] - (14, 21] - */ - - return new Integer[]{(bucketNum + 1) * 7}; - } + /* - for the second test, the boundary is just like : - (0, 1], (0, 1] - (1, 3], (1, 2] - (3, 6], (2, 3] - (6, 10], (3, 4] - (10, 15], (4, 5] - (15, 21], (5, 6] + for the first test, the boundary is just like : + (0, 7] + (7, 14] + (14, 21] */ + return new Integer[]{(bucketNum + 1) * 7}; + } + + @Override + public int getNumberOfFields() { + return this.dim; + } - return new Integer[]{(bucketNum + 1) * (bucketNum + 2) / 2, bucketNum + 1}; + @Override + public TypeInformation[] getKeyTypes() { + return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO}; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(this.dim); + } + + @Override + public void read(DataInputView in) throws IOException { + this.dim = in.readInt(); + } + } + + /** + * The class is used to do the tests of range partition with two keys. + */ + public static class TestDataDist2 implements DataDistribution { + + public int rightBoundary[] = new int[]{6, 4, 9, 1, 2}; + private int dim; --- End diff -- `dim` should always be `2`. Remove the field, the constructor, and update `write()` and `read()`. > Support range partition with user customized data distribution. > --------------------------------------------------------------- > > Key: FLINK-2997 > URL: https://issues.apache.org/jira/browse/FLINK-2997 > Project: Flink > Issue Type: New Feature > Reporter: Chengxiang Li > > This is a followup work of FLINK-7, sometime user have better knowledge of > the source data, and they can build customized data distribution to do range > partition more efficiently. -- This message was sent by Atlassian JIRA (v6.3.4#6332)