Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5662#discussion_r173857218 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala --- @@ -148,6 +148,13 @@ object SchemaValidator { val schema = properties.getTableSchema(SCHEMA) + // add all source fields first because rowtime might reference one of them + toScala(sourceSchema).map(_.getColumnNames).foreach { names => --- End diff -- I think we should first remove the added source fields before adding the explicit mappings with the following snippet. ``` // add explicit mapping case Some(source) => // should add mapping.remove(source) mapping.put(name, source) ```
---