[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15250954#comment-15250954 ] ASF GitHub Bot commented on FLINK-2998: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1838 > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15250727#comment-15250727 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-212608296 Merging this PR > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249827#comment-15249827 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-212413005 Thanks for the fast update @gallenvara! PR looks good and can be merged if tests pass. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249812#comment-15249812 ] ASF GitHub Bot commented on FLINK-2998: --- Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-212409952 @fhueske Thanks a lot for the explanation and the relevant codes have been modified. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249747#comment-15249747 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-212402787 Hi @gallenvara, the tests look good. I left a few comments to make the test code more concise. Regarding your questions: 1) Yes, we could change the `Object[]` to Tuple. However, I am not sure if this would confuse users. Tuple is usually used as a data type in programs while in DataDistribution it would be a holder for composite keys. 2) The problem with returning `Tuple` from a `KeySelector` is that `Tuple` does not implement `Comparable` as requested by `partitionByHash` or `partitionByRange`. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249739#comment-15249739 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r60396134 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java --- @@ -104,6 +112,51 @@ public void testeUDFJoinOnTuplesWithMultipleKeyFieldPositions() throws Exception } @Test + public void testeUDFJoinOnTuplesWithMultipleKeyFieldPositions2() throws Exception { --- End diff -- Comments of the `CoGroupITCase` apply here as well. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249737#comment-15249737 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r60396078 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java --- @@ -301,6 +309,51 @@ public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception } @Test + public void testCoGroupWithMultipleKeyFieldsWithFieldSelector2() throws Exception { --- End diff -- Can you change the method name to `testCoGroupWithRangePartitioning` and move the method to bottom of the file (before the helper classes). > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249736#comment-15249736 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r60395945 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java --- @@ -301,6 +309,51 @@ public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception } @Test + public void testCoGroupWithMultipleKeyFieldsWithFieldSelector2() throws Exception { + /* +* UDF Join on tuples with multiple key field positions and same customized distribution +*/ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> ds1 = CollectionDataSets.get5TupleDataSet(env) + .map(new MapFunction, Tuple5>() { + @Override + public Tuple5 map(Tuple5 value) throws Exception { + return new Tuple5<>(value.f0, value.f1, value.f2, value.f3, value.f4.intValue()); + } + }); + + DataSet> ds2 = CollectionDataSets.get3TupleDataSet(env) + .map(new MapFunction, Tuple3>() { + @Override + public Tuple3 map(Tuple3 value) throws Exception { + return new Tuple3<>(value.f0, value.f1.intValue(), value.f2); + } + }); + + env.setParallelism(4); + TestDistribution testDis = new TestDistribution(); + DataSet> coGrouped = + DataSetUtils.partitionByRange(ds1, testDis, 0, 4) + .coGroup(DataSetUtils.partitionByRange(ds2, testDis, 0, 1)) + .where(0, 4) + .equalTo(0, 1) + .with(new Tuple5Tuple3CoGroup2()); --- End diff -- Please use the same `CoGroupFunction` as in the original test. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249730#comment-15249730 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r60395647 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java --- @@ -797,4 +873,46 @@ public void coGroup(Iterable first, Iterable Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249731#comment-15249731 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r60395706 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java --- @@ -797,4 +873,46 @@ public void coGroup(Iterable first, Iterable Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249729#comment-15249729 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r60395484 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java --- @@ -301,6 +309,51 @@ public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception } @Test + public void testCoGroupWithMultipleKeyFieldsWithFieldSelector2() throws Exception { + /* +* UDF Join on tuples with multiple key field positions and same customized distribution +*/ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> ds1 = CollectionDataSets.get5TupleDataSet(env) + .map(new MapFunction, Tuple5>() { --- End diff -- Can you keep the original data type, i.e., not convert the last field to Integer? 1) It will check if the range partitioning handles different types. 2) it will be more concise (no map functions + no additional CoGroupFunction). > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249428#comment-15249428 ] ASF GitHub Bot commented on FLINK-2998: --- Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-212300584 @fhueske PR updated. I am a little confused when i wrote the tests. The original dataset handled by a `map` operator to ensure that the type of partition key is same with the boundary in the supplied distribution. In the `DataDistribution` interface, the type of `getBucketBoundary` method returned is `Object[]`. My doubt is whether this can be changed to type of `Tuple`. I mean that when range partition by one field, it return `Tuple1` and two fields return `Tuple2`. Also in the `OutputEmmiter`, change the type of keys from `Object[]` to `Tuple` and comparing the key with boundary using `Tuple` comparator. If this is possible, the boundaries in the distribution for rangePartition test will be: `Tuple2[] boundaries = new Tuple2[]{ new Tuple2(1, 1L), new Tuple2(3, 2L), }` This can make the test more succinct and direct. Another confusing is that why partitionByHash and partitionByRange do not support some KeySelectors returned Tuple type such as: ``` public static class KeySelector3 implements KeySelector, Tuple2> { private static final long serialVersionUID = 1L; @Override public Tuple2 getKey(Tuple3 in) { return new Tuple2<>(in.f0,in.f1); } } ``` and can not run the following codes: ``` DataSet> dataSet = ...; dataSet.partitionByRange(new KeySelector3()); ``` Can you explain it to me?Thanks! > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15247751#comment-15247751 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-211923989 Hi @gallenvara, thanks for the update! I tried it locally and it worked as expected. I would like two more test methods though, to ensure that the thing is working end-to-end. Could you add one test method to `JoinITCase` which basically extends `testeUDFJoinOnTuplesWithMultipleKeyFieldPositions()` and uses range partitioning. For that you should provide a DataDistribution and set the parallelism to 4 on the environment. Please to the same with `CoGroupITCase.testCoGroupWithMultipleKeyFieldsWithFieldSelector()` After that we can merge the PR. Thanks, Fabian > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15247745#comment-15247745 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r60229754 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java --- @@ -84,8 +84,8 @@ public PartitionOperator(DataSet input, Keys pKeys, Partitioner customP Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition."); if (distribution != null) { - Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and range partitioner should be the same."); - Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), pKeys.getKeyFieldTypes()), "The types of key from the distribution and range partitioner are not equal."); + Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= distribution.getNumberOfFields(), "The number of key fields in range partitioner should be less than the number in the distribution."); + Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), Arrays.copyOfRange(distribution.getKeyTypes(), 0, pKeys.getNumberOfKeyFields())), "The type array of the partition key should be prefix of the type array of the distribution."); --- End diff -- Oh, maybe I should improve my English skills. The message should read like: `"The types of the flat key fields must be equal to the types of the fields of the distribution."` > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15242304#comment-15242304 ] ASF GitHub Bot commented on FLINK-2998: --- Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-210255818 @fhueske PR updated. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15242303#comment-15242303 ] ASF GitHub Bot commented on FLINK-2998: --- Github user gallenvara commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59819652 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java --- @@ -84,8 +84,8 @@ public PartitionOperator(DataSet input, Keys pKeys, Partitioner customP Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition."); if (distribution != null) { - Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and range partitioner should be the same."); - Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), pKeys.getKeyFieldTypes()), "The types of key from the distribution and range partitioner are not equal."); + Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= distribution.getNumberOfFields(), "The number of key fields in range partitioner should be less than the number in the distribution."); + Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), Arrays.copyOfRange(distribution.getKeyTypes(), 0, pKeys.getNumberOfKeyFields())), "The type array of the partition key should be prefix of the type array of the distribution."); --- End diff -- :) i will improve my poor English skill. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241507#comment-15241507 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-210049036 Thanks for the update @gallenvara. Aside from a few comments the PR looks good. No worries about the `GlobalProperties`. The internals of the optimizer are not straightforward ;-). Thanks for adding the tests for the distribution / key length! > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241502#comment-15241502 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59752679 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java --- @@ -84,8 +84,8 @@ public PartitionOperator(DataSet input, Keys pKeys, Partitioner customP Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition."); if (distribution != null) { - Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and range partitioner should be the same."); - Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), pKeys.getKeyFieldTypes()), "The types of key from the distribution and range partitioner are not equal."); + Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= distribution.getNumberOfFields(), "The number of key fields in range partitioner should be less than the number in the distribution."); + Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), Arrays.copyOfRange(distribution.getKeyTypes(), 0, pKeys.getNumberOfKeyFields())), "The type array of the partition key should be prefix of the type array of the distribution."); --- End diff -- Please split this line and update the message to "The types of the flat key fields must be a equal of the types of the fields of the distribution." > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241492#comment-15241492 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59752212 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java --- @@ -84,8 +84,8 @@ public PartitionOperator(DataSet input, Keys pKeys, Partitioner customP Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition."); if (distribution != null) { - Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and range partitioner should be the same."); - Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), pKeys.getKeyFieldTypes()), "The types of key from the distribution and range partitioner are not equal."); + Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= distribution.getNumberOfFields(), "The number of key fields in range partitioner should be less than the number in the distribution."); --- End diff -- Update the message to "The distribution must provide at least as many fields as flat key fields are specified." > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241491#comment-15241491 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59752067 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java --- @@ -175,6 +175,77 @@ else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) { env.execute(); } + @Test + public void testPartitionKeyLessDistribution() throws Exception{ + /* +* Test the number of keys less than the number of distribution fields +*/ + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + + DataSet> input1 = CollectionDataSets.get3TupleDataSet(env); + final TestDataDist2 dist = new TestDataDist2(); + + env.setParallelism(dist.getParallelism()); + + DataSet result = DataSetUtils + .partitionByRange(input1, dist, 0) + .mapPartition(new RichMapPartitionFunction, Boolean>() { + + @Override + public void mapPartition(Iterable> values, Collector out) throws Exception { + int pIdx = getRuntimeContext().getIndexOfThisSubtask(); + + for (Tuple3 s : values) { + boolean correctlyPartitioned = true; + if (pIdx == 0) { + Integer[] upper = dist.boundaries[0]; + if (s.f0.compareTo(upper[0]) > 0) { + correctlyPartitioned = false; + } + } else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) { + Integer[] lower = dist.boundaries[pIdx - 1]; + Integer[] upper = dist.boundaries[pIdx]; + if (s.f0.compareTo(upper[0]) > 0 || (s.f0.compareTo(lower[0]) <= 0)) { + correctlyPartitioned = false; + } + } else { + Integer[] lower = dist.boundaries[pIdx - 1]; + if ((s.f0.compareTo(lower[0]) <= 0)) { + correctlyPartitioned = false; + } + } + + if (!correctlyPartitioned) { + fail("Record was not correctly partitioned: " + s.toString()); + } + } + } + } + ); + + result.output(new DiscardingOutputFormat()); + env.execute(); + } + + @Test(expected = IllegalArgumentException.class) + public void testPartitionMoreThanDistribution() throws Exception{ + /* +* Test the number of keys larger than the number of distribution fields +*/ + + ExecutionEnvironment
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241477#comment-15241477 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59751421 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java --- @@ -175,6 +175,77 @@ else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) { env.execute(); } + @Test + public void testPartitionKeyLessDistribution() throws Exception{ + /* +* Test the number of keys less than the number of distribution fields +*/ + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + + DataSet> input1 = CollectionDataSets.get3TupleDataSet(env); + final TestDataDist2 dist = new TestDataDist2(); + + env.setParallelism(dist.getParallelism()); + + DataSet result = DataSetUtils + .partitionByRange(input1, dist, 0) + .mapPartition(new RichMapPartitionFunction, Boolean>() { + + @Override --- End diff -- Can you adjust the indention to be as in the other test methods? > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241467#comment-15241467 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59750107 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java --- @@ -151,10 +151,10 @@ public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlob else if(produced1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED && produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { --- End diff -- Add checks that both distributions are not `null`. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241464#comment-15241464 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59750088 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractJoinDescriptor.java --- @@ -142,9 +142,9 @@ public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlob else if(produced1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED && produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { --- End diff -- Add checks that both distributions are not `null`. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241461#comment-15241461 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59749772 --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java --- @@ -95,6 +97,32 @@ public int partition(Object key, int numPartitions) { assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); } + + TestDistribution dist1 = new TestDistribution(1); + TestDistribution dist2 = new TestDistribution(1); + + // test compatible range partitioning + { --- End diff -- Can you add a checks with two keys, one DESC and one ASC? Orders of both should be the same to check if it is correctly identified as compatible. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15236619#comment-15236619 ] ASF GitHub Bot commented on FLINK-2998: --- Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-208712697 @fhueske Thanks a lot for your advice. PR updated. Please forgive my limited understand about the logic of `GlobalProperties`. I added tests to `CustomDistributionITCase` about the number of partition key being less or larger than the number in the distribution. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235383#comment-15235383 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59233583 --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java --- @@ -93,7 +95,34 @@ public int partition(Object key, int numPartitions) { GlobalProperties propsRight = new GlobalProperties(); propsRight.setCustomPartitioned(keysRight, part); - assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + assertTrue(descr1.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + + TestDistribution dist1 = new TestDistribution(1); + TestDistribution dist2 = new TestDistribution(1); + CoGroupDescriptor descr2 = new CoGroupDescriptor(keysLeft, keysRight); + + // test compatible range partitioning --- End diff -- We also need tests for a range partitioning with two (or more) keys and tests for range partitioning with different orders. The tests should check for compatible and incompatible partitionings. Same applies for the `JoinGlobalPropertiesCompatibilityTest`. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235380#comment-15235380 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59233074 --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java --- @@ -35,7 +37,7 @@ public void checkCompatiblePartitionings() { final FieldList keysLeft = new FieldList(1, 4); final FieldList keysRight = new FieldList(3, 1); - CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight); + CoGroupDescriptor descr1 = new CoGroupDescriptor(keysLeft, keysRight); --- End diff -- Can't we use this object for the new test as well? Then we would not need to rename it. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235379#comment-15235379 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59232978 --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java --- @@ -93,7 +95,34 @@ public int partition(Object key, int numPartitions) { GlobalProperties propsRight = new GlobalProperties(); propsRight.setCustomPartitioned(keysRight, part); - assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + assertTrue(descr1.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + + TestDistribution dist1 = new TestDistribution(1); + TestDistribution dist2 = new TestDistribution(1); + SortMergeInnerJoinDescriptor descr2 = new SortMergeInnerJoinDescriptor(keysLeft, keysRight); + + // test compatible range partitioning + { + Ordering ordering1 = new Ordering(); + for (int field : keysLeft) { + ordering1.appendOrdering(field, null, Order.ASCENDING); + } + Ordering ordering2 = new Ordering(); + for (int field : keysRight) { + ordering2.appendOrdering(field, null, Order.ASCENDING); + } + + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setRangePartitioned(ordering1, dist1); + RequestedGlobalProperties reqRigth = new RequestedGlobalProperties(); --- End diff -- typo: `reqRigth` -> `reqRight` copy-pasted to the CoGroup test as well. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235376#comment-15235376 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59232837 --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java --- @@ -35,7 +37,7 @@ public void checkCompatiblePartitionings() { final FieldList keysLeft = new FieldList(1, 4); final FieldList keysRight = new FieldList(3, 1); - SortMergeInnerJoinDescriptor descr = new SortMergeInnerJoinDescriptor(keysLeft, keysRight); + SortMergeInnerJoinDescriptor descr1 = new SortMergeInnerJoinDescriptor(keysLeft, keysRight); --- End diff -- why do you rename this variable. I think it can be used for the new tests as well, no? > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235375#comment-15235375 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59232583 --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/TestDistribution.java --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.operators; + +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.Arrays; + +public class TestDistribution implements DataDistribution { --- End diff -- I think this class can be simplified a lot for our test purposes. We do not need to provide proper implementations for the most of the functionality (`getParallelism`, `getBucketBoundary`, `write`, `read`). `equals` can also be mocked to check for a single value. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235368#comment-15235368 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59231845 --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinWithDistribution.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.operators; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.optimizer.plan.*; +import org.apache.flink.optimizer.util.CompilerTestBase; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class JoinWithDistribution extends CompilerTestBase { --- End diff -- Rename to `JoinWithDistributionTest` > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235356#comment-15235356 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-208421293 `GlobalProperties` must copy the `distribution` itself. There are three places where this must be added: 1) `filterBySemanticProperties()`, line 314: `gp.distribution = this.distribution;` > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235344#comment-15235344 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59229736 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java --- @@ -83,6 +83,10 @@ public void setHashPartitioned(FieldList partitionedFields) { this.partitioningFields = partitionedFields; this.ordering = null; } + + public void setDistribution(DataDistribution distribution) { --- End diff -- Distribution should not be set from outside. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235343#comment-15235343 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59229663 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java --- @@ -454,6 +463,14 @@ public void computeInterestingPropertiesForInputs(CostEstimator estimator) { } } + if (child2.getInputs().iterator().hasNext()) { --- End diff -- Same as above. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235342#comment-15235342 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r59229638 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java --- @@ -427,6 +428,14 @@ public void computeInterestingPropertiesForInputs(CostEstimator estimator) { } } + if (child1.getInputs().iterator().hasNext()) { --- End diff -- This change is not necessary. The data distribution should be set within `GlobalProperties`. I'll point you to the correct places. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15217738#comment-15217738 ] ASF GitHub Bot commented on FLINK-2998: --- Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-203350617 @fhueske Yes, `TwoInputNode` rebuild the channels and `child` nodes don't have the information of `data distribution`. I have added the information into them and PR has been updated. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15217621#comment-15217621 ] ASF GitHub Bot commented on FLINK-2998: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-203307525 I'm currently on vacation. Will have a closer look when I'm back in about a week. I am not sure that we need to touch the Join and CoGroup operators to pass the distributions. The optimizer is able to get this from the GlobalProperties to decide whether the partitionings are valid and equivalent. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15217301#comment-15217301 ] ASF GitHub Bot commented on FLINK-2998: --- Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1838#issuecomment-203216796 @fhueske @ChengXiangLi Can you please help with review? :) > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.
[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15217300#comment-15217300 ] ASF GitHub Bot commented on FLINK-2998: --- GitHub user gallenvara opened a pull request: https://github.com/apache/flink/pull/1838 [FLINK-2998] Support range partition comparison for multi input nodes. The PR implements range partition comparison in operation such as join and cogroup for multi inputs, now optimizer can optimize the dag to avoid re-partition if it find the data distributions user supplied are equal. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gallenvara/flink flink-2998 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1838.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1838 commit 37e6147a829e50ba8a45c26f225e16e7695f6489 Author: gallenvara Date: 2016-03-29T14:36:21Z Support range partition comparison for multi input nodes. > Support range partition comparison for multi input nodes. > - > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer >Reporter: Chengxiang Li >Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)