Repository: flink Updated Branches: refs/heads/master a0e71b88a -> 6187292bc
[FLINK-1490][fix][java-api] Fix incorrect local output sorting of nested types with field position keys. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6187292b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6187292b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6187292b Branch: refs/heads/master Commit: 6187292bca47e7a42fe73a3f8fead50d522be060 Parents: a0e71b8 Author: Fabian Hueske <fhue...@apache.org> Authored: Fri Feb 6 15:11:25 2015 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Feb 6 16:00:50 2015 +0100 ---------------------------------------------------------------------- .../flink/api/java/operators/DataSink.java | 25 ++++++++--- .../test/javaApiOperators/DataSinkITCase.java | 44 ++++++++++++++++---- .../util/CollectionDataSets.java | 21 ++++++++++ 3 files changed, 76 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6187292b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java index 9da5433..e646891 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java @@ -115,18 +115,33 @@ public class DataSink<T> { throw new InvalidProgramException("Order key out of tuple bounds."); } + // get flat keys + Keys.ExpressionKeys<T> ek; + try { + ek = new Keys.ExpressionKeys<T>(new int[]{field}, this.type); + } catch(IllegalArgumentException iae) { + throw new InvalidProgramException("Invalid specification of field expression.", iae); + } + int[] flatKeys = ek.computeLogicalKeyPositions(); + if(this.sortKeyPositions == null) { // set sorting info - this.sortKeyPositions = new int[] {field}; - this.sortOrders = new Order[] {order}; + this.sortKeyPositions = flatKeys; + this.sortOrders = new Order[flatKeys.length]; + Arrays.fill(this.sortOrders, order); } else { // append sorting info to exising info - int newLength = this.sortKeyPositions.length + 1; + int oldLength = this.sortKeyPositions.length; + int newLength = oldLength + flatKeys.length; this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength); this.sortOrders = Arrays.copyOf(this.sortOrders, newLength); - this.sortKeyPositions[newLength-1] = field; - this.sortOrders[newLength-1] = order; + + for(int i=0; i<flatKeys.length; i++) { + this.sortKeyPositions[oldLength+i] = flatKeys[i]; + this.sortOrders[oldLength+i] = order; + } } + return this; } http://git-wip-us.apache.org/repos/asf/flink/blob/6187292b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java index d8663e8..6bd678f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java @@ -208,8 +208,8 @@ public class DataSinkITCase extends MultipleProgramsTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = - CollectionDataSets.getGroupSortedNestedTupleDataSet(env); + DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds = + CollectionDataSets.getGroupSortedNestedTupleDataSet2(env); ds.writeAsText(resultPath) .sortLocalOutput("f0.f1", Order.ASCENDING) .sortLocalOutput("f1", Order.DESCENDING) @@ -218,13 +218,39 @@ public class DataSinkITCase extends MultipleProgramsTestBase { env.execute(); expected = - "((2,1),a)\n" + - "((2,2),b)\n" + - "((1,2),a)\n" + - "((3,3),c)\n" + - "((1,3),a)\n" + - "((3,6),c)\n" + - "((4,9),c)\n"; + "((2,1),a,3)\n" + + "((2,2),b,4)\n" + + "((1,2),a,1)\n" + + "((3,3),c,5)\n" + + "((1,3),a,2)\n" + + "((3,6),c,6)\n" + + "((4,9),c,7)\n"; + + compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); + } + + @Test + public void testTupleSortingNestedDOP1_2() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds = + CollectionDataSets.getGroupSortedNestedTupleDataSet2(env); + ds.writeAsText(resultPath) + .sortLocalOutput(1, Order.ASCENDING) + .sortLocalOutput(2, Order.DESCENDING) + .setParallelism(1); + + env.execute(); + + expected = + "((2,1),a,3)\n" + + "((1,3),a,2)\n" + + "((1,2),a,1)\n" + + "((2,2),b,4)\n" + + "((4,9),c,7)\n" + + "((3,6),c,6)\n" + + "((3,3),c,5)\n"; compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); } http://git-wip-us.apache.org/repos/asf/flink/blob/6187292b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java index 632e7c0..ef6b8a9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java @@ -184,6 +184,27 @@ public class CollectionDataSets { return env.fromCollection(data, type); } + public static DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) { + + List<Tuple3<Tuple2<Integer, Integer>, String, Integer>> data = new ArrayList<Tuple3<Tuple2<Integer, Integer>, String, Integer>>(); + data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer, Integer>(1, 3), "a", 2)); + data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer, Integer>(1, 2), "a", 1)); + data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer, Integer>(2, 1), "a", 3)); + data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer, Integer>(2, 2), "b", 4)); + data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer, Integer>(3, 3), "c", 5)); + data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer, Integer>(3, 6), "c", 6)); + data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer, Integer>(4, 9), "c", 7)); + + TupleTypeInfo<Tuple3<Tuple2<Integer, Integer>, String, Integer>> type = new + TupleTypeInfo<Tuple3<Tuple2<Integer, Integer>, String, Integer>>( + new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO + ); + + return env.fromCollection(data, type); + } + public static DataSet<String> getStringDataSet(ExecutionEnvironment env) { List<String> data = new ArrayList<String>();