Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5132#discussion_r159889511 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -450,37 +450,54 @@ abstract class StreamTableEnvironment( exprs: Array[Expression]) : (Option[(Int, String)], Option[(Int, String)]) = { - val fieldTypes: Array[TypeInformation[_]] = streamType match { - case c: CompositeType[_] => (0 until c.getArity).map(i => c.getTypeAt(i)).toArray - case a: AtomicType[_] => Array(a) + val (isRefByPos, fieldTypes) = streamType match { + case c: CompositeType[_] => + // determine schema definition mode (by position or by name) + (isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => c.getTypeAt(i)).toArray) + case t: TypeInformation[_] => + (false, Array(t)) } var fieldNames: List[String] = Nil var rowtime: Option[(Int, String)] = None var proctime: Option[(Int, String)] = None + def checkRowtimeType(t: TypeInformation[_]): Unit = { + if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) { + throw new TableException( + s"The rowtime attribute can only replace a field with a valid time type, " + + s"such as Timestamp or Long. But was: $t") + } + } + def extractRowtime(idx: Int, name: String, origName: Option[String]): Unit = { if (rowtime.isDefined) { throw new TableException( "The rowtime attribute can only be defined once in a table schema.") } else { - val mappedIdx = streamType match { - case pti: PojoTypeInfo[_] => - pti.getFieldIndex(origName.getOrElse(name)) - case _ => idx; + // if the fields are referenced by position, + // it is possible to replace an existing field or append the time attribute at the end + if (isRefByPos) { + // check type of field that is replaced + if (idx < 0) { --- End diff -- aliases are not permitted for regular fields in ref-by-pos mode. I think prohibit them for rowtime attributes as well by checking for `origName.isDefined`.
---