[FLINK-2442] [fix] FieldPositionKeys support Pojo fields This closes #963
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30761572 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30761572 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30761572 Branch: refs/heads/master Commit: 30761572b5040669b07d261ec9b109797debc549 Parents: b2d8c40 Author: Fabian Hueske <fhue...@apache.org> Authored: Thu Jul 30 21:44:06 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Aug 4 18:16:30 2015 +0200 ---------------------------------------------------------------------- .../apache/flink/api/java/operators/Keys.java | 50 ++++++++++---------- .../api/java/typeutils/TupleTypeInfoBase.java | 20 -------- .../flink/api/java/operators/KeysTest.java | 27 +++++++++++ 3 files changed, 52 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/30761572/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java index 69d306f..09874e5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java @@ -223,43 +223,43 @@ public abstract class Keys<T> { } else { groupingFields = rangeCheckFields(groupingFields, type.getArity() -1); } - CompositeType<?> compositeType = (CompositeType<?>) type; Preconditions.checkArgument(groupingFields.length > 0, "Grouping fields can not be empty at this point"); keyFields = new ArrayList<FlatFieldDescriptor>(type.getTotalFields()); // for each key, find the field: for(int j = 0; j < groupingFields.length; j++) { + int keyPos = groupingFields[j]; + + int offset = 0; for(int i = 0; i < type.getArity(); i++) { - TypeInformation<?> fieldType = compositeType.getTypeAt(i); - - if(groupingFields[j] == i) { // check if user set the key - int keyId = countNestedElementsBefore(compositeType, i) + i; - if(fieldType instanceof TupleTypeInfoBase) { - TupleTypeInfoBase<?> tupleFieldType = (TupleTypeInfoBase<?>) fieldType; - tupleFieldType.addAllFields(keyId, keyFields); - } else { - Preconditions.checkArgument(fieldType instanceof AtomicType, "Wrong field type"); - keyFields.add(new FlatFieldDescriptor(keyId, fieldType)); + + TypeInformation fieldType = ((CompositeType<?>) type).getTypeAt(i); + if(i < keyPos) { + // not yet there, increment key offset + offset += fieldType.getTotalFields(); + } + else { + // arrived at key position + if(fieldType instanceof CompositeType) { + // add all nested fields of composite type + ((CompositeType) fieldType).getFlatFields("*", offset, keyFields); } - + else if(fieldType instanceof AtomicType) { + // add atomic type field + keyFields.add(new FlatFieldDescriptor(offset, fieldType)); + } + else { + // type should either be composite or atomic + throw new InvalidProgramException("Field type is neither CompositeType nor AtomicType: "+fieldType); + } + // go to next key + break; } } } keyFields = removeNullElementsFromList(keyFields); } - - private static int countNestedElementsBefore(CompositeType<?> compositeType, int pos) { - if( pos == 0) { - return 0; - } - int ret = 0; - for (int i = 0; i < pos; i++) { - TypeInformation<?> fieldType = compositeType.getTypeAt(i); - ret += fieldType.getTotalFields() -1; - } - return ret; - } - + public static <R> List<R> removeNullElementsFromList(List<R> in) { List<R> elements = new ArrayList<R>(); for(R e: in) { http://git-wip-us.apache.org/repos/asf/flink/blob/30761572/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java index 3314ca9..881e690 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.operators.Keys.ExpressionKeys; @@ -88,25 +87,6 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { return tupleType; } - /** - * Recursively add all fields in this tuple type. We need this in particular to get all - * the types. - * @param startKeyId - * @param keyFields - */ - public void addAllFields(int startKeyId, List<FlatFieldDescriptor> keyFields) { - for(int i = 0; i < this.getArity(); i++) { - TypeInformation<?> type = this.types[i]; - if(type instanceof AtomicType) { - keyFields.add(new FlatFieldDescriptor(startKeyId, type)); - } else if(type instanceof TupleTypeInfoBase<?>) { - TupleTypeInfoBase<?> ttb = (TupleTypeInfoBase<?>) type; - ttb.addAllFields(startKeyId, keyFields); - } - startKeyId += type.getTotalFields(); - } - } - @Override public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { http://git-wip-us.apache.org/repos/asf/flink/blob/30761572/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java index 67d0240..cf8936d 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java @@ -28,6 +28,7 @@ import org.apache.flink.api.java.operators.Keys.ExpressionKeys; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest.ComplexNestedClass; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.junit.Assert; @@ -254,4 +255,30 @@ public class KeysTest { ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"i0"}, ti); Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions()); } + + @Test + public void testTupleWithNestedPojo() { + + TypeInformation<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> ti = + new TupleTypeInfo<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>( + BasicTypeInfo.INT_TYPE_INFO, + TypeExtractor.getForClass(Pojo1.class), + TypeExtractor.getForClass(PojoWithMultiplePojos.class) + ); + + ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> ek; + + ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{0}, ti); + Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions()); + + ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{1}, ti); + Assert.assertArrayEquals(new int[] {1,2}, ek.computeLogicalKeyPositions()); + + ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{2}, ti); + Assert.assertArrayEquals(new int[] {3,4,5,6,7}, ek.computeLogicalKeyPositions()); + + ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{}, ti, true); + Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6,7}, ek.computeLogicalKeyPositions()); + + } }