[FLINK-2442] [fix] FieldPositionKeys support Pojo fields

This closes #963


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30761572
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30761572
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30761572

Branch: refs/heads/master
Commit: 30761572b5040669b07d261ec9b109797debc549
Parents: b2d8c40
Author: Fabian Hueske <fhue...@apache.org>
Authored: Thu Jul 30 21:44:06 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Aug 4 18:16:30 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/api/java/operators/Keys.java   | 50 ++++++++++----------
 .../api/java/typeutils/TupleTypeInfoBase.java   | 20 --------
 .../flink/api/java/operators/KeysTest.java      | 27 +++++++++++
 3 files changed, 52 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/30761572/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 69d306f..09874e5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -223,43 +223,43 @@ public abstract class Keys<T> {
                        } else {
                                groupingFields = 
rangeCheckFields(groupingFields, type.getArity() -1);
                        }
-                       CompositeType<?> compositeType = (CompositeType<?>) 
type;
                        Preconditions.checkArgument(groupingFields.length > 0, 
"Grouping fields can not be empty at this point");
                        
                        keyFields = new 
ArrayList<FlatFieldDescriptor>(type.getTotalFields());
                        // for each key, find the field:
                        for(int j = 0; j < groupingFields.length; j++) {
+                               int keyPos = groupingFields[j];
+
+                               int offset = 0;
                                for(int i = 0; i < type.getArity(); i++) {
-                                       TypeInformation<?> fieldType = 
compositeType.getTypeAt(i);
-                                       
-                                       if(groupingFields[j] == i) { // check 
if user set the key
-                                               int keyId = 
countNestedElementsBefore(compositeType, i) + i;
-                                               if(fieldType instanceof 
TupleTypeInfoBase) {
-                                                       TupleTypeInfoBase<?> 
tupleFieldType = (TupleTypeInfoBase<?>) fieldType;
-                                                       
tupleFieldType.addAllFields(keyId, keyFields);
-                                               } else {
-                                                       
Preconditions.checkArgument(fieldType instanceof AtomicType, "Wrong field 
type");
-                                                       keyFields.add(new 
FlatFieldDescriptor(keyId, fieldType));
+
+                                       TypeInformation fieldType = 
((CompositeType<?>) type).getTypeAt(i);
+                                       if(i < keyPos) {
+                                               // not yet there, increment key 
offset
+                                               offset += 
fieldType.getTotalFields();
+                                       }
+                                       else {
+                                               // arrived at key position
+                                               if(fieldType instanceof 
CompositeType) {
+                                                       // add all nested 
fields of composite type
+                                                       ((CompositeType) 
fieldType).getFlatFields("*", offset, keyFields);
                                                }
-                                               
+                                               else if(fieldType instanceof 
AtomicType) {
+                                                       // add atomic type field
+                                                       keyFields.add(new 
FlatFieldDescriptor(offset, fieldType));
+                                               }
+                                               else {
+                                                       // type should either 
be composite or atomic
+                                                       throw new 
InvalidProgramException("Field type is neither CompositeType nor AtomicType: 
"+fieldType);
+                                               }
+                                               // go to next key
+                                               break;
                                        }
                                }
                        }
                        keyFields = removeNullElementsFromList(keyFields);
                }
-               
-               private static int countNestedElementsBefore(CompositeType<?> 
compositeType, int pos) {
-                       if( pos == 0) {
-                               return 0;
-                       }
-                       int ret = 0;
-                       for (int i = 0; i < pos; i++) {
-                               TypeInformation<?> fieldType = 
compositeType.getTypeAt(i);
-                               ret += fieldType.getTotalFields() -1;
-                       }
-                       return ret;
-               }
-               
+
                public static <R> List<R> removeNullElementsFromList(List<R> 
in) {
                        List<R> elements = new ArrayList<R>();
                        for(R e: in) {

http://git-wip-us.apache.org/repos/asf/flink/blob/30761572/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index 3314ca9..881e690 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
@@ -88,25 +87,6 @@ public abstract class TupleTypeInfoBase<T> extends 
CompositeType<T> {
                return tupleType;
        }
 
-       /**
-        * Recursively add all fields in this tuple type. We need this in 
particular to get all
-        * the types.
-        * @param startKeyId
-        * @param keyFields
-        */
-       public void addAllFields(int startKeyId, List<FlatFieldDescriptor> 
keyFields) {
-               for(int i = 0; i < this.getArity(); i++) {
-                       TypeInformation<?> type = this.types[i];
-                       if(type instanceof AtomicType) {
-                               keyFields.add(new 
FlatFieldDescriptor(startKeyId, type));
-                       } else if(type instanceof TupleTypeInfoBase<?>) {
-                               TupleTypeInfoBase<?> ttb = 
(TupleTypeInfoBase<?>) type;
-                               ttb.addAllFields(startKeyId, keyFields);
-                       }
-                       startKeyId += type.getTotalFields();
-               }
-       }
-
        @Override
        public void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30761572/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
index 67d0240..cf8936d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple7;
 import 
org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest.ComplexNestedClass;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.junit.Assert;
@@ -254,4 +255,30 @@ public class KeysTest {
                ek = new ExpressionKeys<PojoWithMultiplePojos>(new 
String[]{"i0"}, ti);
                Assert.assertArrayEquals(new int[] {0}, 
ek.computeLogicalKeyPositions());
        }
+
+       @Test
+       public void testTupleWithNestedPojo() {
+
+               TypeInformation<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> 
ti =
+                               new TupleTypeInfo<Tuple3<Integer, Pojo1, 
PojoWithMultiplePojos>>(
+                                       BasicTypeInfo.INT_TYPE_INFO,
+                                       TypeExtractor.getForClass(Pojo1.class),
+                                       
TypeExtractor.getForClass(PojoWithMultiplePojos.class)
+                               );
+
+               ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> 
ek;
+
+               ek = new ExpressionKeys<Tuple3<Integer, Pojo1, 
PojoWithMultiplePojos>>(new int[]{0}, ti);
+               Assert.assertArrayEquals(new int[] {0}, 
ek.computeLogicalKeyPositions());
+
+               ek = new ExpressionKeys<Tuple3<Integer, Pojo1, 
PojoWithMultiplePojos>>(new int[]{1}, ti);
+               Assert.assertArrayEquals(new int[] {1,2}, 
ek.computeLogicalKeyPositions());
+
+               ek = new ExpressionKeys<Tuple3<Integer, Pojo1, 
PojoWithMultiplePojos>>(new int[]{2}, ti);
+               Assert.assertArrayEquals(new int[] {3,4,5,6,7}, 
ek.computeLogicalKeyPositions());
+
+               ek = new ExpressionKeys<Tuple3<Integer, Pojo1, 
PojoWithMultiplePojos>>(new int[]{}, ti, true);
+               Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6,7}, 
ek.computeLogicalKeyPositions());
+
+       }
 }

Reply via email to