Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3020#discussion_r93566506
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java 
---
    @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) {
                }
        }
     
    +   public RowTypeInfo(List<TypeInformation<?>> types, List<String> 
fieldNames) {
    +           super(Row.class, types == null ? null : types.toArray(new 
TypeInformation[types.size()]));
    +           checkNotNull(fieldNames, "FieldNames should not be null.");
    +           checkArgument(
    +                   types.size() == fieldNames.size(),
    +                   "Number of field types and names is different.");
    +           checkArgument(
    +                   types.size() == new HashSet<>(fieldNames).size(),
    +                   "Field names are not unique.");
    +
    +           this.fieldNames = new String[fieldNames.size()];
    +
    +           for (int i = 0; i < fieldNames.size(); i++) {
    +                   this.fieldNames[i] = fieldNames.get(i);
    +           }
    +   }
    +
    +   @Override
    +   public void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
    +           Matcher matcher = 
PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
    +
    +           if (!matcher.matches()) {
    +                   throw new InvalidFieldReferenceException(
    +                           "Invalid tuple field reference \"" + 
fieldExpression + "\".");
    +           }
    +
    +           String field = matcher.group(0);
    +
    +           if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) ||
    +                   (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) {
    +                   // handle select all
    +                   int keyPosition = 0;
    +                   for (TypeInformation<?> fType : types) {
    +                           if (fType instanceof CompositeType) {
    +                                   CompositeType<?> cType = 
(CompositeType<?>) fType;
    +                                   
cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, 
result);
    +                                   keyPosition += cType.getTotalFields() - 
1;
    +                           } else {
    +                                   result.add(new 
FlatFieldDescriptor(offset + keyPosition, fType));
    +                           }
    +                           keyPosition++;
    +                   }
    +           } else {
    +                   field = matcher.group(1);
    +
    +                   Matcher intFieldMatcher = 
PATTERN_INT_FIELD.matcher(field);
    +                   TypeInformation<?> fieldType = null;
    +                   if (intFieldMatcher.matches()) {
    +                           // field expression is an integer
    +                           int fieldIndex = Integer.valueOf(field);
    +                           if (fieldIndex > this.getArity()) {
    +                                   throw new 
InvalidFieldReferenceException(
    +                                           "Row field expression \"" + 
field + "\" out of bounds of " + this.toString() + ".");
    +                           }
    +                           for (int i = 0; i < fieldIndex; i++) {
    +                                   offset += 
this.getTypeAt(i).getTotalFields();
    +                           }
    +                           fieldType = this.getTypeAt(fieldIndex);
    +                   } else {
    +                           for (int i = 0; i < this.fieldNames.length; 
i++) {
    +                                   if (fieldNames[i].equals(field)) {
    +                                           // found field
    +                                           fieldType = this.getTypeAt(i);
    +                                           break;
    +                                   }
    +                                   offset += 
this.getTypeAt(i).getTotalFields();
    +                           }
    +                           if (fieldType == null) {
    +                                   throw new 
InvalidFieldReferenceException(
    +                                           "Unable to find field \"" + 
field + "\" in type " + this.toString() + ".");
    +                           }
    +                   }
    +
    +                   String tail = matcher.group(3);
    +
    +                   if (tail == null) {
    +                           // expression hasn't nested field
    +                           if (fieldType instanceof CompositeType) {
    +                                   ((CompositeType) 
fieldType).getFlatFields("*", offset, result);
    +                           } else {
    +                                   result.add(new 
FlatFieldDescriptor(offset, fieldType));
    +                           }
    +                   } else {
    +                           // expression has nested field
    +                           if (fieldType instanceof CompositeType) {
    +                                   ((CompositeType) 
fieldType).getFlatFields(tail, offset, result);
    +                           } else {
    +                                   throw new 
InvalidFieldReferenceException(
    +                                           "Nested field expression \"" + 
tail + "\" not possible on atomic type " + fieldType + ".");
    +                           }
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
    --- End diff --
    
    It is almost ported from CaseClassTypeInfo.getFlatFields() except the field 
index. The CaseClassTypeInfo is 1-based, but RowTypeInfo is 0-based.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to