Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3020#discussion_r93465353 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java --- @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) { } } + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) { + super(Row.class, types == null ? null : types.toArray(new TypeInformation[types.size()])); + checkNotNull(fieldNames, "FieldNames should not be null."); + checkArgument( + types.size() == fieldNames.size(), + "Number of field types and names is different."); + checkArgument( + types.size() == new HashSet<>(fieldNames).size(), + "Field names are not unique."); + + this.fieldNames = new String[fieldNames.size()]; + + for (int i = 0; i < fieldNames.size(); i++) { + this.fieldNames[i] = fieldNames.get(i); + } + } + + @Override + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + + if (!matcher.matches()) { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + } + + String field = matcher.group(0); + + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) || + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) { + // handle select all + int keyPosition = 0; + for (TypeInformation<?> fType : types) { + if (fType instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + } else { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + } + keyPosition++; + } + } else { + field = matcher.group(1); + + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + TypeInformation<?> fieldType = null; + if (intFieldMatcher.matches()) { + // field expression is an integer + int fieldIndex = Integer.valueOf(field); + if (fieldIndex > this.getArity()) { + throw new InvalidFieldReferenceException( + "Row field expression \"" + field + "\" out of bounds of " + this.toString() + "."); + } + for (int i = 0; i < fieldIndex; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + fieldType = this.getTypeAt(fieldIndex); + } else { + for (int i = 0; i < this.fieldNames.length; i++) { + if (fieldNames[i].equals(field)) { + // found field + fieldType = this.getTypeAt(i); + break; + } + offset += this.getTypeAt(i).getTotalFields(); + } + if (fieldType == null) { + throw new InvalidFieldReferenceException( + "Unable to find field \"" + field + "\" in type " + this.toString() + "."); + } + } + + String tail = matcher.group(3); + + if (tail == null) { + // expression hasn't nested field + if (fieldType instanceof CompositeType) { + ((CompositeType) fieldType).getFlatFields("*", offset, result); + } else { + result.add(new FlatFieldDescriptor(offset, fieldType)); + } + } else { + // expression has nested field + if (fieldType instanceof CompositeType) { + ((CompositeType) fieldType).getFlatFields(tail, offset, result); + } else { + throw new InvalidFieldReferenceException( + "Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + "."); + } + } + } + } + + @Override + public <X> TypeInformation<X> getTypeAt(String fieldExpression) { + Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression); + if (!matcher.matches()) { + if (fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || + fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here."); + } else { + throw new InvalidFieldReferenceException("Invalid format of Row field expression \""+fieldExpression+"\"."); + } + } + + String field = matcher.group(1); + + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + TypeInformation<X> fieldType = null; + if (intFieldMatcher.matches()) { + // field expression is an integer + int fieldIndex = Integer.valueOf(field); + if (fieldIndex > this.getArity()) { --- End diff -- `TupleTypeBase.getFieldAt()` does check for index bounds as well and throws an `IndexOutOfBoundsException`. So we could simplify this a bit here.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---