[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249736#comment-15249736
 ] 

ASF GitHub Bot commented on FLINK-2998:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r60395945
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 ---
    @@ -301,6 +309,51 @@ public void 
testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception
        }
     
        @Test
    +   public void testCoGroupWithMultipleKeyFieldsWithFieldSelector2() throws 
Exception {
    +           /*
    +            * UDF Join on tuples with multiple key field positions and 
same customized distribution
    +            */
    +
    +           final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
    +
    +           DataSet<Tuple5<Integer, Long, Integer, String, Integer>> ds1 = 
CollectionDataSets.get5TupleDataSet(env)
    +                           .map(new MapFunction<Tuple5<Integer, Long, 
Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Integer>>() {
    +                                   @Override
    +                                   public Tuple5<Integer, Long, Integer, 
String, Integer> map(Tuple5<Integer, Long, Integer, String, Long> value) throws 
Exception {
    +                                           return new Tuple5<>(value.f0, 
value.f1, value.f2, value.f3, value.f4.intValue());
    +                                   }
    +                           });
    +
    +           DataSet<Tuple3<Integer, Integer, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env)
    +                           .map(new MapFunction<Tuple3<Integer, Long, 
String>, Tuple3<Integer, Integer, String>>() {
    +                                   @Override
    +                                   public Tuple3<Integer, Integer, String> 
map(Tuple3<Integer, Long, String> value) throws Exception {
    +                                           return new Tuple3<>(value.f0, 
value.f1.intValue(), value.f2);
    +                                   }
    +                           });
    +           
    +           env.setParallelism(4);
    +           TestDistribution testDis = new TestDistribution();
    +           DataSet<Tuple3<Integer, Long, String>> coGrouped =
    +                           DataSetUtils.partitionByRange(ds1, testDis, 0, 
4)
    +                           .coGroup(DataSetUtils.partitionByRange(ds2, 
testDis, 0, 1))
    +                           .where(0, 4)
    +                           .equalTo(0, 1)
    +                           .with(new Tuple5Tuple3CoGroup2());
    --- End diff --
    
    Please use the same `CoGroupFunction` as in the original test.


> Support range partition comparison for multi input nodes.
> ---------------------------------------------------------
>
>                 Key: FLINK-2998
>                 URL: https://issues.apache.org/jira/browse/FLINK-2998
>             Project: Flink
>          Issue Type: New Feature
>          Components: Optimizer
>            Reporter: Chengxiang Li
>            Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to