[ https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392933#comment-16392933 ]
ASF GitHub Bot commented on FLINK-8854: --------------------------------------- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5662#discussion_r173465435 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala --- @@ -198,14 +205,20 @@ object SchemaValidator { val isProctime = properties .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME") .orElse(false) - val isRowtime = properties - .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE") + val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE" + val isRowtime = properties.containsKey(tsType) if (!isProctime && !isRowtime) { // check for a aliasing val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM") .orElse(n) builder.field(fieldName, t) } + // only use the rowtime attribute if it references a field + else if (isRowtime && + properties.getString(tsType) == ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) { --- End diff -- You are right, we should declare `ExistingField` `final`. In the custom extractor case, a user has to supply the format manually. Maybe we will need an explanation logic in the future such that a user can see how the derived format looks like and if it makes sense to declare it explicitly. > Mapping of SchemaValidator.deriveFieldMapping() is incorrect. > ------------------------------------------------------------- > > Key: FLINK-8854 > URL: https://issues.apache.org/jira/browse/FLINK-8854 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.5.0 > Reporter: Fabian Hueske > Assignee: Timo Walther > Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not > correct. > It should not only include all fields of the table schema, but also all > fields of the format schema (mapped to themselves). Otherwise, it is not > possible to use a timestamp extractor on a field that is not in table schema. > For example this configuration would fail: > {code} > sources: > - name: TaxiRides > schema: > - name: rideId > type: LONG > - name: rowTime > type: TIMESTAMP > rowtime: > timestamps: > type: "from-field" > from: "rideTime" > watermarks: > type: "periodic-bounded" > delay: "60000" > connector: > .... > format: > property-version: 1 > type: json > schema: "ROW(rideId LONG, rideTime TIMESTAMP)" > {code} > because {{rideTime}} is not in the table schema. -- This message was sent by Atlassian JIRA (v7.6.3#76005)