[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15250954#comment-15250954
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1838


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15250727#comment-15250727
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-212608296
  
Merging this PR


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249827#comment-15249827
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-212413005
  
Thanks for the fast update @gallenvara! 
PR looks good and can be merged if tests pass.



> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249812#comment-15249812
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-212409952
  
@fhueske Thanks a lot for the explanation and the relevant codes have been 
modified.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249747#comment-15249747
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-212402787
  
Hi @gallenvara, the tests look good. I left a few comments to make the test 
code more concise.

Regarding your questions:
1) Yes, we could change the `Object[]` to Tuple. However, I am not sure if 
this would confuse users. Tuple is usually used as a data type in programs 
while in DataDistribution it would be a holder for composite keys. 
2) The problem with returning `Tuple` from a `KeySelector` is that `Tuple` 
does not implement `Comparable` as requested by `partitionByHash` or 
`partitionByRange`. 


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249739#comment-15249739
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r60396134
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
 ---
@@ -104,6 +112,51 @@ public void 
testeUDFJoinOnTuplesWithMultipleKeyFieldPositions() throws Exception
}
 
@Test
+   public void testeUDFJoinOnTuplesWithMultipleKeyFieldPositions2() throws 
Exception {
--- End diff --

Comments of the `CoGroupITCase` apply here as well.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249737#comment-15249737
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r60396078
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 ---
@@ -301,6 +309,51 @@ public void 
testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception
}
 
@Test
+   public void testCoGroupWithMultipleKeyFieldsWithFieldSelector2() throws 
Exception {
--- End diff --

Can you change the method name to `testCoGroupWithRangePartitioning` and 
move the method to bottom of the file (before the helper classes).


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249736#comment-15249736
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r60395945
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 ---
@@ -301,6 +309,51 @@ public void 
testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception
}
 
@Test
+   public void testCoGroupWithMultipleKeyFieldsWithFieldSelector2() throws 
Exception {
+   /*
+* UDF Join on tuples with multiple key field positions and 
same customized distribution
+*/
+
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSet> ds1 = 
CollectionDataSets.get5TupleDataSet(env)
+   .map(new MapFunction, Tuple5>() {
+   @Override
+   public Tuple5 map(Tuple5 value) throws 
Exception {
+   return new Tuple5<>(value.f0, 
value.f1, value.f2, value.f3, value.f4.intValue());
+   }
+   });
+
+   DataSet> ds2 = 
CollectionDataSets.get3TupleDataSet(env)
+   .map(new MapFunction, Tuple3>() {
+   @Override
+   public Tuple3 
map(Tuple3 value) throws Exception {
+   return new Tuple3<>(value.f0, 
value.f1.intValue(), value.f2);
+   }
+   });
+   
+   env.setParallelism(4);
+   TestDistribution testDis = new TestDistribution();
+   DataSet> coGrouped =
+   DataSetUtils.partitionByRange(ds1, testDis, 0, 
4)
+   .coGroup(DataSetUtils.partitionByRange(ds2, 
testDis, 0, 1))
+   .where(0, 4)
+   .equalTo(0, 1)
+   .with(new Tuple5Tuple3CoGroup2());
--- End diff --

Please use the same `CoGroupFunction` as in the original test.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249730#comment-15249730
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r60395647
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 ---
@@ -797,4 +873,46 @@ public void coGroup(Iterable first, 
Iterable Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249731#comment-15249731
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r60395706
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 ---
@@ -797,4 +873,46 @@ public void coGroup(Iterable first, 
Iterable Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249729#comment-15249729
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r60395484
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 ---
@@ -301,6 +309,51 @@ public void 
testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception
}
 
@Test
+   public void testCoGroupWithMultipleKeyFieldsWithFieldSelector2() throws 
Exception {
+   /*
+* UDF Join on tuples with multiple key field positions and 
same customized distribution
+*/
+
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSet> ds1 = 
CollectionDataSets.get5TupleDataSet(env)
+   .map(new MapFunction, Tuple5>() {
--- End diff --

Can you keep the original data type, i.e., not convert the last field to 
Integer? 
1) It will check if the range partitioning handles different types.
2) it will be more concise (no map functions + no additional 
CoGroupFunction).


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249428#comment-15249428
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-212300584
  
@fhueske PR updated.
I am a little confused when i wrote the tests. The original dataset handled 
by a `map` operator to ensure that the type of partition key is same with the 
boundary in the supplied distribution. In the `DataDistribution` interface, the 
type of `getBucketBoundary` method returned is `Object[]`. My doubt is whether 
this can be changed to type of `Tuple`. I mean that when range partition by one 
field, it return `Tuple1` and two fields return `Tuple2`. Also in the 
`OutputEmmiter`, change the type of keys from `Object[]` to `Tuple` and 
comparing the key with boundary using `Tuple` comparator. If this is possible, 
the boundaries in the distribution for rangePartition test will be:
`Tuple2[] boundaries = new Tuple2[]{
new Tuple2(1, 1L),
new Tuple2(3, 2L),

}`
This can make the test more succinct and direct.
Another confusing is that why partitionByHash and partitionByRange do not 
support some KeySelectors returned Tuple type such as:
```
public static class KeySelector3 implements 
KeySelector, Tuple2> {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 getKey(Tuple3 
in) {
return new Tuple2<>(in.f0,in.f1);
}
}
```
and can not run the following codes:
```
DataSet> dataSet = ...;
dataSet.partitionByRange(new KeySelector3());
```
Can you explain it to me?Thanks!


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15247751#comment-15247751
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-211923989
  
Hi @gallenvara, thanks for the update! I tried it locally and it worked as 
expected. 

I would like two more test methods though, to ensure that the thing is 
working end-to-end.
Could you add one test method to `JoinITCase` which basically extends 
`testeUDFJoinOnTuplesWithMultipleKeyFieldPositions()` and uses range 
partitioning. For that you should provide a DataDistribution and set the 
parallelism to 4 on the environment. 
Please to the same with 
`CoGroupITCase.testCoGroupWithMultipleKeyFieldsWithFieldSelector()`

After that we can merge the PR. Thanks, Fabian


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15247745#comment-15247745
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r60229754
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 ---
@@ -84,8 +84,8 @@ public PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customP
Preconditions.checkArgument(distribution == null || pMethod == 
PartitionMethod.RANGE, "Customized data distribution is only neccessary for 
range partition.");

if (distribution != null) {
-   
Preconditions.checkArgument(distribution.getNumberOfFields() == 
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and 
range partitioner should be the same.");
-   
Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), 
pKeys.getKeyFieldTypes()), "The types of key from the distribution and range 
partitioner are not equal.");
+   
Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= 
distribution.getNumberOfFields(), "The number of key fields in range 
partitioner should be less than the number in the distribution.");
+   
Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), 
Arrays.copyOfRange(distribution.getKeyTypes(), 0, 
pKeys.getNumberOfKeyFields())), "The type array of the partition key should be 
prefix of the type array of the distribution.");
--- End diff --

Oh, maybe I should improve my English skills. The message should read like: 
`"The types of the flat key fields must be equal to the types of the fields of 
the distribution."`


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15242304#comment-15242304
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-210255818
  
@fhueske PR updated.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15242303#comment-15242303
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user gallenvara commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59819652
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 ---
@@ -84,8 +84,8 @@ public PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customP
Preconditions.checkArgument(distribution == null || pMethod == 
PartitionMethod.RANGE, "Customized data distribution is only neccessary for 
range partition.");

if (distribution != null) {
-   
Preconditions.checkArgument(distribution.getNumberOfFields() == 
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and 
range partitioner should be the same.");
-   
Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), 
pKeys.getKeyFieldTypes()), "The types of key from the distribution and range 
partitioner are not equal.");
+   
Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= 
distribution.getNumberOfFields(), "The number of key fields in range 
partitioner should be less than the number in the distribution.");
+   
Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), 
Arrays.copyOfRange(distribution.getKeyTypes(), 0, 
pKeys.getNumberOfKeyFields())), "The type array of the partition key should be 
prefix of the type array of the distribution.");
--- End diff --

:) i will improve my poor English skill.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241507#comment-15241507
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-210049036
  
Thanks for the update @gallenvara. Aside from a few comments the PR looks 
good. No worries about the `GlobalProperties`. The internals of the optimizer 
are not straightforward ;-). 
Thanks for adding the tests for the distribution / key length!



> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241502#comment-15241502
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59752679
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 ---
@@ -84,8 +84,8 @@ public PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customP
Preconditions.checkArgument(distribution == null || pMethod == 
PartitionMethod.RANGE, "Customized data distribution is only neccessary for 
range partition.");

if (distribution != null) {
-   
Preconditions.checkArgument(distribution.getNumberOfFields() == 
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and 
range partitioner should be the same.");
-   
Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), 
pKeys.getKeyFieldTypes()), "The types of key from the distribution and range 
partitioner are not equal.");
+   
Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= 
distribution.getNumberOfFields(), "The number of key fields in range 
partitioner should be less than the number in the distribution.");
+   
Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), 
Arrays.copyOfRange(distribution.getKeyTypes(), 0, 
pKeys.getNumberOfKeyFields())), "The type array of the partition key should be 
prefix of the type array of the distribution.");
--- End diff --

Please split this line and update the message to "The types of the flat key 
fields must be a equal of the types of the fields of the distribution."


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241492#comment-15241492
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59752212
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 ---
@@ -84,8 +84,8 @@ public PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customP
Preconditions.checkArgument(distribution == null || pMethod == 
PartitionMethod.RANGE, "Customized data distribution is only neccessary for 
range partition.");

if (distribution != null) {
-   
Preconditions.checkArgument(distribution.getNumberOfFields() == 
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and 
range partitioner should be the same.");
-   
Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), 
pKeys.getKeyFieldTypes()), "The types of key from the distribution and range 
partitioner are not equal.");
+   
Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= 
distribution.getNumberOfFields(), "The number of key fields in range 
partitioner should be less than the number in the distribution.");
--- End diff --

Update the message to "The distribution must provide at least as many 
fields as flat key fields are specified."


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241491#comment-15241491
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59752067
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
 ---
@@ -175,6 +175,77 @@ else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) 
{
env.execute();
}
 
+   @Test
+   public void testPartitionKeyLessDistribution() throws Exception{
+   /*
+* Test the number of keys less than the number of distribution 
fields
+*/
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+
+   DataSet> input1 = 
CollectionDataSets.get3TupleDataSet(env);
+   final TestDataDist2 dist = new TestDataDist2();
+
+   env.setParallelism(dist.getParallelism());
+
+   DataSet result = DataSetUtils
+   .partitionByRange(input1, dist, 0)
+   .mapPartition(new 
RichMapPartitionFunction, Boolean>() {
+
+ @Override
+ public void 
mapPartition(Iterable> values, Collector 
out) throws Exception {
+ int 
pIdx = getRuntimeContext().getIndexOfThisSubtask();
+
+ for 
(Tuple3 s : values) {
+   
  boolean correctlyPartitioned = true;
+   
  if (pIdx == 0) {
+   
  Integer[] upper = dist.boundaries[0];
+   
  if (s.f0.compareTo(upper[0]) > 0) {
+   
  correctlyPartitioned = false;
+   
  }
+   
  } else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) {
+   
  Integer[] lower = dist.boundaries[pIdx - 1];
+   
  Integer[] upper = dist.boundaries[pIdx];
+   
  if (s.f0.compareTo(upper[0]) > 0 || (s.f0.compareTo(lower[0]) <= 0)) {
+   
  correctlyPartitioned = false;
+   
  }
+   
  } else {
+   
  Integer[] lower = dist.boundaries[pIdx - 1];
+   
  if ((s.f0.compareTo(lower[0]) <= 0)) {
+   
  correctlyPartitioned = false;
+   
  }
+   
  }
+
+   
  if (!correctlyPartitioned) {
+   
  fail("Record was not correctly partitioned: " + s.toString());
+   
  }
+ }
+ }
+ }
+   );
+
+   result.output(new DiscardingOutputFormat());
+   env.execute();
+   }
+
+   @Test(expected = IllegalArgumentException.class)
+   public void testPartitionMoreThanDistribution() throws Exception{
+   /*
+* Test the number of keys larger than the number of 
distribution fields
+*/
+
+   ExecutionEnvironment

[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241477#comment-15241477
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59751421
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
 ---
@@ -175,6 +175,77 @@ else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) 
{
env.execute();
}
 
+   @Test
+   public void testPartitionKeyLessDistribution() throws Exception{
+   /*
+* Test the number of keys less than the number of distribution 
fields
+*/
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+
+   DataSet> input1 = 
CollectionDataSets.get3TupleDataSet(env);
+   final TestDataDist2 dist = new TestDataDist2();
+
+   env.setParallelism(dist.getParallelism());
+
+   DataSet result = DataSetUtils
+   .partitionByRange(input1, dist, 0)
+   .mapPartition(new 
RichMapPartitionFunction, Boolean>() {
+
+ @Override
--- End diff --

Can you adjust the indention to be as in the other test methods?


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241467#comment-15241467
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59750107
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
 ---
@@ -151,10 +151,10 @@ public boolean 
areCompatible(RequestedGlobalProperties requested1, RequestedGlob
else if(produced1.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED &&
produced2.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED) {
--- End diff --

Add checks that both distributions are not `null`.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241464#comment-15241464
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59750088
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractJoinDescriptor.java
 ---
@@ -142,9 +142,9 @@ public boolean areCompatible(RequestedGlobalProperties 
requested1, RequestedGlob
else if(produced1.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED &&
produced2.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED) {
--- End diff --

Add checks that both distributions are not `null`.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241461#comment-15241461
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59749772
  
--- Diff: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
 ---
@@ -95,6 +97,32 @@ public int partition(Object key, int numPartitions) {

assertTrue(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
}
+
+   TestDistribution dist1 = new TestDistribution(1);
+   TestDistribution dist2 = new TestDistribution(1);
+   
+   // test compatible range partitioning
+   {
--- End diff --

Can you add a checks with two keys, one DESC and one ASC? 
Orders of both should be the same to check if it is correctly identified as 
compatible.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15236619#comment-15236619
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-208712697
  
@fhueske Thanks a lot for your advice. PR updated. Please forgive my 
limited understand about the logic of `GlobalProperties`. I added tests to 
`CustomDistributionITCase` about the number of partition key being less or 
larger than the number in the distribution.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235383#comment-15235383
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59233583
  
--- Diff: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
 ---
@@ -93,7 +95,34 @@ public int partition(Object key, int numPartitions) {
GlobalProperties propsRight = new 
GlobalProperties();
propsRight.setCustomPartitioned(keysRight, 
part);

-   assertTrue(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+   assertTrue(descr1.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+   }
+
+   TestDistribution dist1 = new TestDistribution(1);
+   TestDistribution dist2 = new TestDistribution(1);
+   CoGroupDescriptor descr2 = new 
CoGroupDescriptor(keysLeft, keysRight);
+   
+   // test compatible range partitioning
--- End diff --

We also need tests for a range partitioning with two (or more) keys and 
tests for range partitioning with different orders.
The tests should check for compatible and incompatible partitionings.
Same applies for the `JoinGlobalPropertiesCompatibilityTest`.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235380#comment-15235380
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59233074
  
--- Diff: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
 ---
@@ -35,7 +37,7 @@ public void checkCompatiblePartitionings() {
final FieldList keysLeft = new FieldList(1, 4);
final FieldList keysRight = new FieldList(3, 1);

-   CoGroupDescriptor descr = new 
CoGroupDescriptor(keysLeft, keysRight);
+   CoGroupDescriptor descr1 = new 
CoGroupDescriptor(keysLeft, keysRight);
--- End diff --

Can't we use this object for the new test as well? Then we would not need 
to rename it.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235379#comment-15235379
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59232978
  
--- Diff: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
 ---
@@ -93,7 +95,34 @@ public int partition(Object key, int numPartitions) {
GlobalProperties propsRight = new 
GlobalProperties();
propsRight.setCustomPartitioned(keysRight, 
part);

-   assertTrue(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+   assertTrue(descr1.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+   }
+
+   TestDistribution dist1 = new TestDistribution(1);
+   TestDistribution dist2 = new TestDistribution(1);
+   SortMergeInnerJoinDescriptor descr2 = new 
SortMergeInnerJoinDescriptor(keysLeft, keysRight);
+   
+   // test compatible range partitioning
+   {
+   Ordering ordering1 = new Ordering();
+   for (int field : keysLeft) {
+   ordering1.appendOrdering(field, null, 
Order.ASCENDING);
+   }
+   Ordering ordering2 = new Ordering();
+   for (int field : keysRight) {
+   ordering2.appendOrdering(field, null, 
Order.ASCENDING);
+   }
+
+   RequestedGlobalProperties reqLeft = new 
RequestedGlobalProperties();
+   reqLeft.setRangePartitioned(ordering1, dist1);
+   RequestedGlobalProperties reqRigth = new 
RequestedGlobalProperties();
--- End diff --

typo: `reqRigth` -> `reqRight`
copy-pasted to the CoGroup test as well.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235376#comment-15235376
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59232837
  
--- Diff: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
 ---
@@ -35,7 +37,7 @@ public void checkCompatiblePartitionings() {
final FieldList keysLeft = new FieldList(1, 4);
final FieldList keysRight = new FieldList(3, 1);
 
-   SortMergeInnerJoinDescriptor descr = new 
SortMergeInnerJoinDescriptor(keysLeft, keysRight);
+   SortMergeInnerJoinDescriptor descr1 = new 
SortMergeInnerJoinDescriptor(keysLeft, keysRight);
--- End diff --

why do you rename this variable. I think it can be used for the new tests 
as well, no?


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235375#comment-15235375
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59232583
  
--- Diff: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/TestDistribution.java
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class TestDistribution implements DataDistribution {
--- End diff --

I think this class can be simplified a lot for our test purposes. We do not 
need to provide proper implementations for the most of the functionality 
(`getParallelism`, `getBucketBoundary`, `write`, `read`). `equals` can also be 
mocked to check for a single value.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235368#comment-15235368
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59231845
  
--- Diff: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinWithDistribution.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.optimizer.plan.*;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class JoinWithDistribution extends CompilerTestBase {
--- End diff --

Rename to `JoinWithDistributionTest`


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235356#comment-15235356
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-208421293
  
`GlobalProperties` must copy the `distribution` itself. There are three 
places where this must be added:
1) `filterBySemanticProperties()`, line 314: `gp.distribution = 
this.distribution;`


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235344#comment-15235344
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59229736
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
 ---
@@ -83,6 +83,10 @@ public void setHashPartitioned(FieldList 
partitionedFields) {
this.partitioningFields = partitionedFields;
this.ordering = null;
}
+   
+   public void setDistribution(DataDistribution distribution) {
--- End diff --

Distribution should not be set from outside.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235343#comment-15235343
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59229663
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java 
---
@@ -454,6 +463,14 @@ public void 
computeInterestingPropertiesForInputs(CostEstimator estimator) {
}
}

+   if 
(child2.getInputs().iterator().hasNext()) {
--- End diff --

Same as above.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235342#comment-15235342
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r59229638
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java 
---
@@ -427,6 +428,14 @@ public void 
computeInterestingPropertiesForInputs(CostEstimator estimator) {
}
}

+   if 
(child1.getInputs().iterator().hasNext()) {
--- End diff --

This change is not necessary. The data distribution should be set within 
`GlobalProperties`. I'll point you to the correct places.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-03-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15217738#comment-15217738
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-203350617
  
@fhueske Yes, `TwoInputNode` rebuild the channels and `child` nodes don't 
have the information of `data distribution`. I have added the information into 
them and PR has been updated.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-03-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15217621#comment-15217621
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-203307525
  
I'm currently on vacation. Will have a closer look when I'm back in about a 
week.

I am not sure that we need to touch the Join and CoGroup operators to pass 
the distributions. The optimizer is able to get this from the GlobalProperties 
to decide whether the partitionings are valid and equivalent.


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15217301#comment-15217301
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-203216796
  
@fhueske @ChengXiangLi Can you please help with review? :)


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15217300#comment-15217300
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

GitHub user gallenvara opened a pull request:

https://github.com/apache/flink/pull/1838

[FLINK-2998] Support range partition comparison for multi input nodes.

The PR implements range partition comparison in operation such as join and 
cogroup for multi inputs, now optimizer can optimize the dag to avoid 
re-partition if it find the data distributions user supplied are equal.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gallenvara/flink flink-2998

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1838.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1838


commit 37e6147a829e50ba8a45c26f225e16e7695f6489
Author: gallenvara 
Date:   2016-03-29T14:36:21Z

Support range partition comparison for multi input nodes.




> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)