Ron Crocker created FLINK-3697: ---------------------------------- Summary: keyBy() with nested POJO computes invalid field position indexes Key: FLINK-3697 URL: https://issues.apache.org/jira/browse/FLINK-3697 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.0.0 Environment: MacOS X 10.10 Reporter: Ron Crocker Priority: Minor
Using named keys in keyBy() for nested POJO types results in failure. The iindexes for named key fields are used inconsistently with nested POJO types. In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position after (apparently) flattening the structure but is referenced in the unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}. In the example below, getFlatFields() returns positions 0, 1, and 14. These positions appear correct in the flattened structure of the Data class. However, in {{KeySelector<X, Tuple> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig)}}, a call to {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the length of the directly named fields of the object vs the length of flattened version of that type. Concrete Example: Consider this graph: {code} DataStream<TimesliceData> dataStream = see.addSource(new FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), kafkaConsumerProperties)); dataStream .flatMap(new DataMapper()) .keyBy("aaa", "abc", "wxyz") {code} {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes this NativeDataFormat object and extracts individual Data objects: {code} public class Data { public int aaa; public int abc; public long wxyz; public int t1; public int t2; public Policy policy; public Stats stats; public Data() {} {code} A {{Policy}} object is an instance of this class: {code} public class AggregatableMetricStoragePolicy implements MetricStoragePolicy { public short a; public short b; public boolean c; public boolean d; public Policy() {} } {code} A {{Stats}} object is an instance of this class: {code} public class Stats { public long count; public float a; public float b; public float c; public float d; public float e; public Stats() {} } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)