Repository: flink
Updated Branches:
  refs/heads/master a0e71b88a -> 6187292bc


[FLINK-1490][fix][java-api] Fix incorrect local output sorting of nested types 
with field position keys.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6187292b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6187292b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6187292b

Branch: refs/heads/master
Commit: 6187292bca47e7a42fe73a3f8fead50d522be060
Parents: a0e71b8
Author: Fabian Hueske <fhue...@apache.org>
Authored: Fri Feb 6 15:11:25 2015 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Feb 6 16:00:50 2015 +0100

----------------------------------------------------------------------
 .../flink/api/java/operators/DataSink.java      | 25 ++++++++---
 .../test/javaApiOperators/DataSinkITCase.java   | 44 ++++++++++++++++----
 .../util/CollectionDataSets.java                | 21 ++++++++++
 3 files changed, 76 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6187292b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 9da5433..e646891 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -115,18 +115,33 @@ public class DataSink<T> {
                        throw new InvalidProgramException("Order key out of 
tuple bounds.");
                }
 
+               // get flat keys
+               Keys.ExpressionKeys<T> ek;
+               try {
+                       ek = new Keys.ExpressionKeys<T>(new int[]{field}, 
this.type);
+               } catch(IllegalArgumentException iae) {
+                       throw new InvalidProgramException("Invalid 
specification of field expression.", iae);
+               }
+               int[] flatKeys = ek.computeLogicalKeyPositions();
+
                if(this.sortKeyPositions == null) {
                        // set sorting info
-                       this.sortKeyPositions = new int[] {field};
-                       this.sortOrders = new Order[] {order};
+                       this.sortKeyPositions = flatKeys;
+                       this.sortOrders = new Order[flatKeys.length];
+                       Arrays.fill(this.sortOrders, order);
                } else {
                        // append sorting info to exising info
-                       int newLength = this.sortKeyPositions.length + 1;
+                       int oldLength = this.sortKeyPositions.length;
+                       int newLength = oldLength + flatKeys.length;
                        this.sortKeyPositions = 
Arrays.copyOf(this.sortKeyPositions, newLength);
                        this.sortOrders = Arrays.copyOf(this.sortOrders, 
newLength);
-                       this.sortKeyPositions[newLength-1] = field;
-                       this.sortOrders[newLength-1] = order;
+
+                       for(int i=0; i<flatKeys.length; i++) {
+                               this.sortKeyPositions[oldLength+i] = 
flatKeys[i];
+                               this.sortOrders[oldLength+i] = order;
+                       }
                }
+
                return this;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6187292b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
index d8663e8..6bd678f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
@@ -208,8 +208,8 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
 
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-               DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds =
-                               
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
+               DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds =
+                               
CollectionDataSets.getGroupSortedNestedTupleDataSet2(env);
                ds.writeAsText(resultPath)
                                .sortLocalOutput("f0.f1", Order.ASCENDING)
                                .sortLocalOutput("f1", Order.DESCENDING)
@@ -218,13 +218,39 @@ public class DataSinkITCase extends 
MultipleProgramsTestBase {
                env.execute();
 
                expected =
-                               "((2,1),a)\n" +
-                               "((2,2),b)\n" +
-                               "((1,2),a)\n" +
-                               "((3,3),c)\n" +
-                               "((1,3),a)\n" +
-                               "((3,6),c)\n" +
-                               "((4,9),c)\n";
+                               "((2,1),a,3)\n" +
+                               "((2,2),b,4)\n" +
+                               "((1,2),a,1)\n" +
+                               "((3,3),c,5)\n" +
+                               "((1,3),a,2)\n" +
+                               "((3,6),c,6)\n" +
+                               "((4,9),c,7)\n";
+
+               compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
+       }
+
+       @Test
+       public void testTupleSortingNestedDOP1_2() throws Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds =
+                               
CollectionDataSets.getGroupSortedNestedTupleDataSet2(env);
+               ds.writeAsText(resultPath)
+                               .sortLocalOutput(1, Order.ASCENDING)
+                               .sortLocalOutput(2, Order.DESCENDING)
+                               .setParallelism(1);
+
+               env.execute();
+
+               expected =
+                               "((2,1),a,3)\n" +
+                               "((1,3),a,2)\n" +
+                               "((1,2),a,1)\n" +
+                               "((2,2),b,4)\n" +
+                               "((4,9),c,7)\n" +
+                               "((3,6),c,6)\n" +
+                               "((3,3),c,5)\n";
 
                compareResultsByLinesInMemoryWithStrictOrder(expected, 
resultPath);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6187292b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 632e7c0..ef6b8a9 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -184,6 +184,27 @@ public class CollectionDataSets {
                return env.fromCollection(data, type);
        }
 
+       public static DataSet<Tuple3<Tuple2<Integer, Integer>, String, 
Integer>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) {
+
+               List<Tuple3<Tuple2<Integer, Integer>, String, Integer>> data = 
new ArrayList<Tuple3<Tuple2<Integer, Integer>, String, Integer>>();
+               data.add(new Tuple3<Tuple2<Integer, Integer>, String, 
Integer>(new Tuple2<Integer, Integer>(1, 3), "a", 2));
+               data.add(new Tuple3<Tuple2<Integer, Integer>, String, 
Integer>(new Tuple2<Integer, Integer>(1, 2), "a", 1));
+               data.add(new Tuple3<Tuple2<Integer, Integer>, String, 
Integer>(new Tuple2<Integer, Integer>(2, 1), "a", 3));
+               data.add(new Tuple3<Tuple2<Integer, Integer>, String, 
Integer>(new Tuple2<Integer, Integer>(2, 2), "b", 4));
+               data.add(new Tuple3<Tuple2<Integer, Integer>, String, 
Integer>(new Tuple2<Integer, Integer>(3, 3), "c", 5));
+               data.add(new Tuple3<Tuple2<Integer, Integer>, String, 
Integer>(new Tuple2<Integer, Integer>(3, 6), "c", 6));
+               data.add(new Tuple3<Tuple2<Integer, Integer>, String, 
Integer>(new Tuple2<Integer, Integer>(4, 9), "c", 7));
+
+               TupleTypeInfo<Tuple3<Tuple2<Integer, Integer>, String, 
Integer>> type = new
+                               TupleTypeInfo<Tuple3<Tuple2<Integer, Integer>, 
String, Integer>>(
+                               new TupleTypeInfo<Tuple2<Integer, 
Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               BasicTypeInfo.INT_TYPE_INFO
+               );
+
+               return env.fromCollection(data, type);
+       }
+
        public static DataSet<String> getStringDataSet(ExecutionEnvironment 
env) {
 
                List<String> data = new ArrayList<String>();

Reply via email to