[
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249736#comment-15249736
]
ASF GitHub Bot commented on FLINK-2998:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1838#discussion_r60395945
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
---
@@ -301,6 +309,51 @@ public void
testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception
}
@Test
+ public void testCoGroupWithMultipleKeyFieldsWithFieldSelector2() throws
Exception {
+ /*
+ * UDF Join on tuples with multiple key field positions and
same customized distribution
+ */
+
+ final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Integer>> ds1 =
CollectionDataSets.get5TupleDataSet(env)
+ .map(new MapFunction<Tuple5<Integer, Long,
Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Integer>>() {
+ @Override
+ public Tuple5<Integer, Long, Integer,
String, Integer> map(Tuple5<Integer, Long, Integer, String, Long> value) throws
Exception {
+ return new Tuple5<>(value.f0,
value.f1, value.f2, value.f3, value.f4.intValue());
+ }
+ });
+
+ DataSet<Tuple3<Integer, Integer, String>> ds2 =
CollectionDataSets.get3TupleDataSet(env)
+ .map(new MapFunction<Tuple3<Integer, Long,
String>, Tuple3<Integer, Integer, String>>() {
+ @Override
+ public Tuple3<Integer, Integer, String>
map(Tuple3<Integer, Long, String> value) throws Exception {
+ return new Tuple3<>(value.f0,
value.f1.intValue(), value.f2);
+ }
+ });
+
+ env.setParallelism(4);
+ TestDistribution testDis = new TestDistribution();
+ DataSet<Tuple3<Integer, Long, String>> coGrouped =
+ DataSetUtils.partitionByRange(ds1, testDis, 0,
4)
+ .coGroup(DataSetUtils.partitionByRange(ds2,
testDis, 0, 1))
+ .where(0, 4)
+ .equalTo(0, 1)
+ .with(new Tuple5Tuple3CoGroup2());
--- End diff --
Please use the same `CoGroupFunction` as in the original test.
> Support range partition comparison for multi input nodes.
> ---------------------------------------------------------
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
> Issue Type: New Feature
> Components: Optimizer
> Reporter: Chengxiang Li
> Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it
> found two input range partition are equivalent, we does not support the
> comparison yet.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)