Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3020#discussion_r93465353
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java 
---
    @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) {
                }
        }
     
    +   public RowTypeInfo(List<TypeInformation<?>> types, List<String> 
fieldNames) {
    +           super(Row.class, types == null ? null : types.toArray(new 
TypeInformation[types.size()]));
    +           checkNotNull(fieldNames, "FieldNames should not be null.");
    +           checkArgument(
    +                   types.size() == fieldNames.size(),
    +                   "Number of field types and names is different.");
    +           checkArgument(
    +                   types.size() == new HashSet<>(fieldNames).size(),
    +                   "Field names are not unique.");
    +
    +           this.fieldNames = new String[fieldNames.size()];
    +
    +           for (int i = 0; i < fieldNames.size(); i++) {
    +                   this.fieldNames[i] = fieldNames.get(i);
    +           }
    +   }
    +
    +   @Override
    +   public void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
    +           Matcher matcher = 
PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
    +
    +           if (!matcher.matches()) {
    +                   throw new InvalidFieldReferenceException(
    +                           "Invalid tuple field reference \"" + 
fieldExpression + "\".");
    +           }
    +
    +           String field = matcher.group(0);
    +
    +           if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) ||
    +                   (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) {
    +                   // handle select all
    +                   int keyPosition = 0;
    +                   for (TypeInformation<?> fType : types) {
    +                           if (fType instanceof CompositeType) {
    +                                   CompositeType<?> cType = 
(CompositeType<?>) fType;
    +                                   
cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, 
result);
    +                                   keyPosition += cType.getTotalFields() - 
1;
    +                           } else {
    +                                   result.add(new 
FlatFieldDescriptor(offset + keyPosition, fType));
    +                           }
    +                           keyPosition++;
    +                   }
    +           } else {
    +                   field = matcher.group(1);
    +
    +                   Matcher intFieldMatcher = 
PATTERN_INT_FIELD.matcher(field);
    +                   TypeInformation<?> fieldType = null;
    +                   if (intFieldMatcher.matches()) {
    +                           // field expression is an integer
    +                           int fieldIndex = Integer.valueOf(field);
    +                           if (fieldIndex > this.getArity()) {
    +                                   throw new 
InvalidFieldReferenceException(
    +                                           "Row field expression \"" + 
field + "\" out of bounds of " + this.toString() + ".");
    +                           }
    +                           for (int i = 0; i < fieldIndex; i++) {
    +                                   offset += 
this.getTypeAt(i).getTotalFields();
    +                           }
    +                           fieldType = this.getTypeAt(fieldIndex);
    +                   } else {
    +                           for (int i = 0; i < this.fieldNames.length; 
i++) {
    +                                   if (fieldNames[i].equals(field)) {
    +                                           // found field
    +                                           fieldType = this.getTypeAt(i);
    +                                           break;
    +                                   }
    +                                   offset += 
this.getTypeAt(i).getTotalFields();
    +                           }
    +                           if (fieldType == null) {
    +                                   throw new 
InvalidFieldReferenceException(
    +                                           "Unable to find field \"" + 
field + "\" in type " + this.toString() + ".");
    +                           }
    +                   }
    +
    +                   String tail = matcher.group(3);
    +
    +                   if (tail == null) {
    +                           // expression hasn't nested field
    +                           if (fieldType instanceof CompositeType) {
    +                                   ((CompositeType) 
fieldType).getFlatFields("*", offset, result);
    +                           } else {
    +                                   result.add(new 
FlatFieldDescriptor(offset, fieldType));
    +                           }
    +                   } else {
    +                           // expression has nested field
    +                           if (fieldType instanceof CompositeType) {
    +                                   ((CompositeType) 
fieldType).getFlatFields(tail, offset, result);
    +                           } else {
    +                                   throw new 
InvalidFieldReferenceException(
    +                                           "Nested field expression \"" + 
tail + "\" not possible on atomic type " + fieldType + ".");
    +                           }
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
    +           Matcher matcher = 
PATTERN_NESTED_FIELDS.matcher(fieldExpression);
    +           if (!matcher.matches()) {
    +                   if 
(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) ||
    +                           
fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
    +                           throw new 
InvalidFieldReferenceException("Wildcard expressions are not allowed here.");
    +                   } else {
    +                           throw new 
InvalidFieldReferenceException("Invalid format of Row field expression 
\""+fieldExpression+"\".");
    +                   }
    +           }
    +
    +           String field = matcher.group(1);
    +
    +           Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
    +           TypeInformation<X> fieldType = null;
    +           if (intFieldMatcher.matches()) {
    +                   // field expression is an integer
    +                   int fieldIndex = Integer.valueOf(field);
    +                   if (fieldIndex > this.getArity()) {
    --- End diff --
    
    `TupleTypeBase.getFieldAt()` does check for index bounds as well and throws 
an `IndexOutOfBoundsException`.
    So we could simplify this a bit here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to