Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5662#discussion_r173784883 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala --- @@ -148,6 +148,13 @@ object SchemaValidator { val schema = properties.getTableSchema(SCHEMA) + // add all source fields first because rowtime might reference one of them + toScala(sourceSchema).map(_.getColumnNames).foreach { names => --- End diff -- Hi @twalthr, can we check the used `TimestampExtractor` here? Specifically, if it's an `ExistingField`, we only included the target fields; if it's a `StreamRecordTimestamp` we don't include extra fields; and only if it's a custom extractor we include all the source fields.
---