[
https://issues.apache.org/jira/browse/FLINK-5348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15767436#comment-15767436
]
ASF GitHub Bot commented on FLINK-5348:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3020#discussion_r93466347
--- Diff:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
---
@@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) {
}
}
+ public RowTypeInfo(List<TypeInformation<?>> types, List<String>
fieldNames) {
+ super(Row.class, types == null ? null : types.toArray(new
TypeInformation[types.size()]));
+ checkNotNull(fieldNames, "FieldNames should not be null.");
+ checkArgument(
+ types.size() == fieldNames.size(),
+ "Number of field types and names is different.");
+ checkArgument(
+ types.size() == new HashSet<>(fieldNames).size(),
+ "Field names are not unique.");
+
+ this.fieldNames = new String[fieldNames.size()];
+
+ for (int i = 0; i < fieldNames.size(); i++) {
+ this.fieldNames[i] = fieldNames.get(i);
+ }
+ }
+
+ @Override
+ public void getFlatFields(String fieldExpression, int offset,
List<FlatFieldDescriptor> result) {
+ Matcher matcher =
PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
+
+ if (!matcher.matches()) {
+ throw new InvalidFieldReferenceException(
+ "Invalid tuple field reference \"" +
fieldExpression + "\".");
+ }
+
+ String field = matcher.group(0);
+
+ if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) ||
+ (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) {
+ // handle select all
+ int keyPosition = 0;
+ for (TypeInformation<?> fType : types) {
+ if (fType instanceof CompositeType) {
+ CompositeType<?> cType =
(CompositeType<?>) fType;
+
cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition,
result);
+ keyPosition += cType.getTotalFields() -
1;
+ } else {
+ result.add(new
FlatFieldDescriptor(offset + keyPosition, fType));
+ }
+ keyPosition++;
+ }
+ } else {
+ field = matcher.group(1);
+
+ Matcher intFieldMatcher =
PATTERN_INT_FIELD.matcher(field);
+ TypeInformation<?> fieldType = null;
+ if (intFieldMatcher.matches()) {
+ // field expression is an integer
+ int fieldIndex = Integer.valueOf(field);
+ if (fieldIndex > this.getArity()) {
+ throw new
InvalidFieldReferenceException(
+ "Row field expression \"" +
field + "\" out of bounds of " + this.toString() + ".");
+ }
+ for (int i = 0; i < fieldIndex; i++) {
+ offset +=
this.getTypeAt(i).getTotalFields();
+ }
+ fieldType = this.getTypeAt(fieldIndex);
+ } else {
+ for (int i = 0; i < this.fieldNames.length;
i++) {
+ if (fieldNames[i].equals(field)) {
+ // found field
+ fieldType = this.getTypeAt(i);
+ break;
+ }
+ offset +=
this.getTypeAt(i).getTotalFields();
+ }
+ if (fieldType == null) {
+ throw new
InvalidFieldReferenceException(
+ "Unable to find field \"" +
field + "\" in type " + this.toString() + ".");
+ }
+ }
+
+ String tail = matcher.group(3);
+
+ if (tail == null) {
+ // expression hasn't nested field
+ if (fieldType instanceof CompositeType) {
+ ((CompositeType)
fieldType).getFlatFields("*", offset, result);
+ } else {
+ result.add(new
FlatFieldDescriptor(offset, fieldType));
+ }
+ } else {
+ // expression has nested field
+ if (fieldType instanceof CompositeType) {
+ ((CompositeType)
fieldType).getFlatFields(tail, offset, result);
+ } else {
+ throw new
InvalidFieldReferenceException(
+ "Nested field expression \"" +
tail + "\" not possible on atomic type " + fieldType + ".");
+ }
+ }
+ }
+ }
+
+ @Override
+ public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
+ Matcher matcher =
PATTERN_NESTED_FIELDS.matcher(fieldExpression);
+ if (!matcher.matches()) {
+ if
(fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) ||
+
fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+ throw new
InvalidFieldReferenceException("Wildcard expressions are not allowed here.");
+ } else {
+ throw new
InvalidFieldReferenceException("Invalid format of Row field expression
\""+fieldExpression+"\".");
+ }
+ }
+
+ String field = matcher.group(1);
+
+ Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
+ TypeInformation<X> fieldType = null;
+ if (intFieldMatcher.matches()) {
+ // field expression is an integer
+ int fieldIndex = Integer.valueOf(field);
+ if (fieldIndex > this.getArity()) {
+ throw new InvalidFieldReferenceException(
+ "Row field expression \"" + field + "\"
out of bounds of " + this.toString() + ".");
+ }
+ fieldType = this.getTypeAt(fieldIndex);
+ } else {
--- End diff --
translate `field` into an index with `getFieldIndex()` and use a common
path fetch the type?
> Support custom field names for RowTypeInfo
> ------------------------------------------
>
> Key: FLINK-5348
> URL: https://issues.apache.org/jira/browse/FLINK-5348
> Project: Flink
> Issue Type: Improvement
> Components: Core
> Reporter: Jark Wu
> Assignee: Jark Wu
>
> Currently, the RowTypeInfo doesn't support optional custom field names, but
> forced to generate {{f0}} ~ {{fn}} as field names. It would be better to
> support custom names and will benefit some cases (e.g. FLINK-5280).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)