[
https://issues.apache.org/jira/browse/FLINK-2997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15208217#comment-15208217
]
ASF GitHub Bot commented on FLINK-2997:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1776#discussion_r57140198
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
---
@@ -110,52 +118,93 @@ public void mapPartition(Iterable<Tuple3<Integer,
Integer, String>> values, Coll
}
/**
- * The class is used to do the tests of range partition with customed
data distribution.
+ * The class is used to do the tests of range partition with one key.
*/
- public static class TestDataDist implements DataDistribution {
+ public static class TestDataDist1 implements DataDistribution {
private int dim;
- public TestDataDist() {}
+ public TestDataDist1() {}
/**
* Constructor of the customized distribution for range
partition.
* @param dim the number of the fields.
*/
- public TestDataDist(int dim) {
+ public TestDataDist1(int dim) {
this.dim = dim;
}
public int getParallelism() {
- if (dim == 1) {
- return 3;
- }
- return 6;
+ return 3;
}
@Override
public Object[] getBucketBoundary(int bucketNum, int
totalNumBuckets) {
- if (dim == 1) {
- /*
- for the first test, the boundary is just like :
- (0, 7]
- (7, 14]
- (14, 21]
- */
-
- return new Integer[]{(bucketNum + 1) * 7};
- }
+
/*
- for the second test, the boundary is just like :
- (0, 1], (0, 1]
- (1, 3], (1, 2]
- (3, 6], (2, 3]
- (6, 10], (3, 4]
- (10, 15], (4, 5]
- (15, 21], (5, 6]
+ for the first test, the boundary is just like :
+ (0, 7]
+ (7, 14]
+ (14, 21]
*/
+ return new Integer[]{(bucketNum + 1) * 7};
+ }
+
+ @Override
+ public int getNumberOfFields() {
+ return this.dim;
+ }
- return new Integer[]{(bucketNum + 1) * (bucketNum + 2)
/ 2, bucketNum + 1};
+ @Override
+ public TypeInformation[] getKeyTypes() {
+ return new
TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO};
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(this.dim);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.dim = in.readInt();
+ }
+ }
+
+ /**
+ * The class is used to do the tests of range partition with two keys.
+ */
+ public static class TestDataDist2 implements DataDistribution {
+
+ public int rightBoundary[] = new int[]{6, 4, 9, 1, 2};
+ private int dim;
--- End diff --
`dim` should always be `2`. Remove the field, the constructor, and update
`write()` and `read()`.
> Support range partition with user customized data distribution.
> ---------------------------------------------------------------
>
> Key: FLINK-2997
> URL: https://issues.apache.org/jira/browse/FLINK-2997
> Project: Flink
> Issue Type: New Feature
> Reporter: Chengxiang Li
>
> This is a followup work of FLINK-7, sometime user have better knowledge of
> the source data, and they can build customized data distribution to do range
> partition more efficiently.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)