[ 
https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313209#comment-16313209
 ] 

ASF GitHub Bot commented on FLINK-8203:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5132#discussion_r159876208
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
    @@ -446,52 +443,60 @@ abstract class StreamTableEnvironment(
         * Checks for at most one rowtime and proctime attribute.
         * Returns the time attributes.
         *
    -    * @param isReferenceByPosition schema mode see 
[[isReferenceByPosition()]]
    -    *
         * @return rowtime attribute and proctime attribute
         */
       private def validateAndExtractTimeAttributes(
    -    isReferenceByPosition: Boolean,
         streamType: TypeInformation[_],
         exprs: Array[Expression])
       : (Option[(Int, String)], Option[(Int, String)]) = {
     
    -    val fieldTypes: Array[TypeInformation[_]] = streamType match {
    -      case c: CompositeType[_] => (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray
    -      case t: TypeInformation[_] => Array(t)
    +    val (isRefByPos, fieldTypes) = streamType match {
    +      case c: CompositeType[_] =>
    +        // determine schema definition mode (by position or by name)
    +        (isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray)
    +      case t: TypeInformation[_] =>
    +        (false, Array(t))
         }
     
         var fieldNames: List[String] = Nil
         var rowtime: Option[(Int, String)] = None
         var proctime: Option[(Int, String)] = None
     
    +    def checkRowtimeType(t: TypeInformation[_]): Unit = {
    +      if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
    +        throw new TableException(
    +          s"The rowtime attribute can only replace a field with a valid 
time type, " +
    +          s"such as Timestamp or Long. But was: $t")
    +      }
    +    }
    +
         def extractRowtime(idx: Int, name: String, origName: Option[String]): 
Unit = {
           if (rowtime.isDefined) {
             throw new TableException(
               "The rowtime attribute can only be defined once in a table 
schema.")
           } else {
             // if the fields are referenced by position,
             // it is possible to replace an existing field or append the time 
attribute at the end
    -        if (isReferenceByPosition) {
    -
    -          val mappedIdx = streamType match {
    -            case pti: PojoTypeInfo[_] =>
    -              pti.getFieldIndex(origName.getOrElse(name))
    -            case _ => idx;
    -          }
    -
    +        if (isRefByPos) {
               // check type of field that is replaced
    -          if (mappedIdx < 0) {
    +          if (idx < 0) {
                 throw new TableException(
                   s"The rowtime attribute can only replace a valid field. " +
                     s"${origName.getOrElse(name)} is not a field of type 
$streamType.")
               }
    -          else if (mappedIdx < fieldTypes.length &&
    -            !(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) ||
    -              TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx)))) {
    -            throw new TableException(
    -              s"The rowtime attribute can only replace a field with a 
valid time type, " +
    -                s"such as Timestamp or Long. But was: 
${fieldTypes(mappedIdx)}")
    +          else if (idx < fieldTypes.length) {
    +            checkRowtimeType(fieldTypes(idx))
    +          }
    +        }
    +        // check for valid alias if referenced by name
    +        else if (origName.isDefined) {
    +          // check for valid alias
    +          streamType match {
    +            case ct: CompositeType[_] if ct.hasField(origName.get) =>
    +              val t = ct.getTypeAt(ct.getFieldIndex(origName.get))
    +              checkRowtimeType(t)
    +            case _ =>
    +              throw new TableException("An alias must always reference an 
existing field.")
    --- End diff --
    
    Add `origName` (and the existing fields?) to the error message.


> Make schema definition of DataStream/DataSet to Table conversion more flexible
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-8203
>                 URL: https://issues.apache.org/jira/browse/FLINK-8203
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.4.0, 1.5.0
>            Reporter: Fabian Hueske
>            Assignee: Timo Walther
>
> When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, 
> the schema of the table can be defined (by default it is extracted from the 
> {{TypeInformation}}.
> The schema needs to be manually specified to select (project) fields, rename 
> fields, or define time attributes. Right now, there are several limitations 
> how the fields can be defined that also depend on the type of the 
> {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., 
> tuples, case classes, Row) require schema definition based on the position of 
> fields. Pojo types which have no fixed order of fields, require to refer to 
> fields by name. Moreover, there are several restrictions on how time 
> attributes can be defined, e.g., event time attribute must replace an 
> existing field or be appended and proctime attributes must be appended.
> I think we can make the schema definition more flexible and provide two modes:
> 1. Reference input fields by name: All fields in the schema definition are 
> referenced by name (and possibly renamed using an alias ({{as}}). In this 
> mode, fields can be reordered and projected out. Moreover, we can define 
> proctime and eventtime attributes at arbitrary positions using arbitrary 
> names (except those that existing the result schema). This mode can be used 
> for any input type, including POJOs. This mode is used if all field 
> references exist in the input type.
> 2. Reference input fields by position: Field references might not refer to 
> existing fields in the input type. In this mode, fields are simply renamed. 
> Event-time attributes can replace the field on their position in the input 
> data (if it is of correct type) or be appended at the end. Proctime 
> attributes must be appended at the end. This mode can only be used if the 
> input type has a defined field order (tuple, case class, Row).
> We need to add more tests the check for all combinations of input types and 
> schema definition modes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to