Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5132#discussion_r159889511
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
    @@ -450,37 +450,54 @@ abstract class StreamTableEnvironment(
         exprs: Array[Expression])
       : (Option[(Int, String)], Option[(Int, String)]) = {
     
    -    val fieldTypes: Array[TypeInformation[_]] = streamType match {
    -      case c: CompositeType[_] => (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray
    -      case a: AtomicType[_] => Array(a)
    +    val (isRefByPos, fieldTypes) = streamType match {
    +      case c: CompositeType[_] =>
    +        // determine schema definition mode (by position or by name)
    +        (isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray)
    +      case t: TypeInformation[_] =>
    +        (false, Array(t))
         }
     
         var fieldNames: List[String] = Nil
         var rowtime: Option[(Int, String)] = None
         var proctime: Option[(Int, String)] = None
     
    +    def checkRowtimeType(t: TypeInformation[_]): Unit = {
    +      if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
    +        throw new TableException(
    +          s"The rowtime attribute can only replace a field with a valid 
time type, " +
    +          s"such as Timestamp or Long. But was: $t")
    +      }
    +    }
    +
         def extractRowtime(idx: Int, name: String, origName: Option[String]): 
Unit = {
           if (rowtime.isDefined) {
             throw new TableException(
               "The rowtime attribute can only be defined once in a table 
schema.")
           } else {
    -        val mappedIdx = streamType match {
    -          case pti: PojoTypeInfo[_] =>
    -            pti.getFieldIndex(origName.getOrElse(name))
    -          case _ => idx;
    +        // if the fields are referenced by position,
    +        // it is possible to replace an existing field or append the time 
attribute at the end
    +        if (isRefByPos) {
    +          // check type of field that is replaced
    +          if (idx < 0) {
    --- End diff --
    
    aliases are not permitted for regular fields in ref-by-pos mode. I think 
prohibit them for rowtime attributes as well by checking for 
`origName.isDefined`.


---

Reply via email to