Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3020#discussion_r93405238
--- Diff:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
---
@@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) {
}
}
+ public RowTypeInfo(List<TypeInformation<?>> types, List<String>
fieldNames) {
+ super(Row.class, types == null ? null : types.toArray(new
TypeInformation[types.size()]));
+ checkNotNull(fieldNames, "FieldNames should not be null.");
+ checkArgument(
+ types.size() == fieldNames.size(),
+ "Number of field types and names is different.");
+ checkArgument(
+ types.size() == new HashSet<>(fieldNames).size(),
+ "Field names are not unique.");
+
+ this.fieldNames = new String[fieldNames.size()];
+
+ for (int i = 0; i < fieldNames.size(); i++) {
+ this.fieldNames[i] = fieldNames.get(i);
+ }
+ }
+
+ @Override
+ public void getFlatFields(String fieldExpression, int offset,
List<FlatFieldDescriptor> result) {
+ Matcher matcher =
PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
+
+ if (!matcher.matches()) {
+ throw new InvalidFieldReferenceException(
+ "Invalid tuple field reference \"" +
fieldExpression + "\".");
+ }
+
+ String field = matcher.group(0);
+
+ if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) ||
+ (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) {
+ // handle select all
+ int keyPosition = 0;
+ for (TypeInformation<?> fType : types) {
+ if (fType instanceof CompositeType) {
+ CompositeType<?> cType =
(CompositeType<?>) fType;
+
cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition,
result);
+ keyPosition += cType.getTotalFields() -
1;
+ } else {
+ result.add(new
FlatFieldDescriptor(offset + keyPosition, fType));
+ }
+ keyPosition++;
+ }
+ } else {
+ field = matcher.group(1);
+
+ Matcher intFieldMatcher =
PATTERN_INT_FIELD.matcher(field);
+ TypeInformation<?> fieldType = null;
+ if (intFieldMatcher.matches()) {
+ // field expression is an integer
+ int fieldIndex = Integer.valueOf(field);
+ if (fieldIndex > this.getArity()) {
+ throw new
InvalidFieldReferenceException(
+ "Row field expression \"" +
field + "\" out of bounds of " + this.toString() + ".");
+ }
+ for (int i = 0; i < fieldIndex; i++) {
+ offset +=
this.getTypeAt(i).getTotalFields();
+ }
+ fieldType = this.getTypeAt(fieldIndex);
+ } else {
+ for (int i = 0; i < this.fieldNames.length;
i++) {
+ if (fieldNames[i].equals(field)) {
+ // found field
+ fieldType = this.getTypeAt(i);
+ break;
+ }
+ offset +=
this.getTypeAt(i).getTotalFields();
+ }
+ if (fieldType == null) {
+ throw new
InvalidFieldReferenceException(
+ "Unable to find field \"" +
field + "\" in type " + this.toString() + ".");
+ }
+ }
+
+ String tail = matcher.group(3);
+
+ if (tail == null) {
+ // expression hasn't nested field
+ if (fieldType instanceof CompositeType) {
+ ((CompositeType)
fieldType).getFlatFields("*", offset, result);
+ } else {
+ result.add(new
FlatFieldDescriptor(offset, fieldType));
+ }
+ } else {
+ // expression has nested field
+ if (fieldType instanceof CompositeType) {
+ ((CompositeType)
fieldType).getFlatFields(tail, offset, result);
+ } else {
+ throw new
InvalidFieldReferenceException(
+ "Nested field expression \"" +
tail + "\" not possible on atomic type " + fieldType + ".");
+ }
+ }
+ }
+ }
+
+ @Override
+ public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
--- End diff --
This is the same logic as in `CaseClassTypeInfo.getTypeAt()` only ported to
Java, right?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---