[ 
https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16312188#comment-16312188
 ] 

ASF GitHub Bot commented on FLINK-8203:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5132#discussion_r159767413
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
    @@ -791,92 +824,109 @@ abstract class TableEnvironment(val config: 
TableConfig) {
         * Returns field names and field positions for a given 
[[TypeInformation]] and [[Array]] of
         * [[Expression]]. It does not handle time attributes but considers 
them in indices.
         *
    +    * @param isReferenceByPosition schema mode see 
[[isReferenceByPosition()]]
         * @param inputType The [[TypeInformation]] against which the 
[[Expression]]s are evaluated.
         * @param exprs     The expressions that define the field names.
         * @tparam A The type of the TypeInformation.
         * @return A tuple of two arrays holding the field names and 
corresponding field positions.
         */
    -  protected[flink] def getFieldInfo[A](
    +  protected def getFieldInfo[A](
    +      isReferenceByPosition: Boolean,
           inputType: TypeInformation[A],
           exprs: Array[Expression])
         : (Array[String], Array[Int]) = {
     
         TableEnvironment.validateType(inputType)
     
    +    def referenceByName(name: String, ct: CompositeType[_]): Option[(Int, 
String)] = {
    +      val inputIdx = ct.getFieldIndex(name)
    +      if (inputIdx < 0) {
    +        throw new TableException(s"$name is not a field of type $ct. " +
    +                s"Expected: ${ct.getFieldNames.mkString(", ")}")
    +      } else {
    +        Some((inputIdx, name))
    +      }
    +    }
    +
         val indexedNames: Array[(Int, String)] = inputType match {
    +
           case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
             throw new TableException(
               "An input of GenericTypeInfo<Row> cannot be converted to Table. 
" +
                 "Please specify the type of the input with a RowTypeInfo.")
    -      case a: AtomicType[_] =>
    -        exprs.zipWithIndex flatMap {
    -          case (_: TimeAttribute, _) =>
    -            None
    -          case (UnresolvedFieldReference(name), idx) if idx > 0 =>
    -            // only accept the first field for an atomic type
    -            throw new TableException("Only the first field can reference 
an atomic type.")
    -          case (UnresolvedFieldReference(name), idx) =>
    -            // first field reference is mapped to atomic type
    -            Some((0, name))
    -          case _ => throw new TableException("Field reference expression 
requested.")
    -        }
    +
           case t: TupleTypeInfo[A] =>
             exprs.zipWithIndex flatMap {
    -          case (UnresolvedFieldReference(name), idx) =>
    -            Some((idx, name))
    -          case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
    +          case (UnresolvedFieldReference(name: String), idx) =>
    +            if (isReferenceByPosition) {
    +              Some((idx, name))
    +            } else {
    +              referenceByName(name, t)
    +            }
    +          case (Alias(UnresolvedFieldReference(origName), name: String, 
_), _) =>
                 val idx = t.getFieldIndex(origName)
                 if (idx < 0) {
    -              throw new TableException(s"$origName is not a field of type 
$t")
    +              throw new TableException(s"$origName is not a field of type 
$t. " +
    +                s"Expected: ${t.getFieldNames.mkString(", ")}")
                 }
                 Some((idx, name))
               case (_: TimeAttribute, _) =>
                 None
               case _ => throw new TableException(
                 "Field reference expression or alias on field expression 
expected.")
             }
    +
           case c: CaseClassTypeInfo[A] =>
    --- End diff --
    
    Can we merge the cases of `TupleTypeInfo`, `CaseClassTypeInfo`, and 
`RowTypeInfo` by checking for 
    
    ```
    case t: TupleTypeInfoBase[A] if t.isInstanceOf[TupleTypeInfo[A]] ||
            t.isInstanceOf[CaseClassTypeInfo[A]] || t.isInstanceOf[RowTypeInfo] 
=>
    ```
      


> Make schema definition of DataStream/DataSet to Table conversion more flexible
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-8203
>                 URL: https://issues.apache.org/jira/browse/FLINK-8203
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.4.0, 1.5.0
>            Reporter: Fabian Hueske
>            Assignee: Timo Walther
>
> When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, 
> the schema of the table can be defined (by default it is extracted from the 
> {{TypeInformation}}.
> The schema needs to be manually specified to select (project) fields, rename 
> fields, or define time attributes. Right now, there are several limitations 
> how the fields can be defined that also depend on the type of the 
> {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., 
> tuples, case classes, Row) require schema definition based on the position of 
> fields. Pojo types which have no fixed order of fields, require to refer to 
> fields by name. Moreover, there are several restrictions on how time 
> attributes can be defined, e.g., event time attribute must replace an 
> existing field or be appended and proctime attributes must be appended.
> I think we can make the schema definition more flexible and provide two modes:
> 1. Reference input fields by name: All fields in the schema definition are 
> referenced by name (and possibly renamed using an alias ({{as}}). In this 
> mode, fields can be reordered and projected out. Moreover, we can define 
> proctime and eventtime attributes at arbitrary positions using arbitrary 
> names (except those that existing the result schema). This mode can be used 
> for any input type, including POJOs. This mode is used if all field 
> references exist in the input type.
> 2. Reference input fields by position: Field references might not refer to 
> existing fields in the input type. In this mode, fields are simply renamed. 
> Event-time attributes can replace the field on their position in the input 
> data (if it is of correct type) or be appended at the end. Proctime 
> attributes must be appended at the end. This mode can only be used if the 
> input type has a defined field order (tuple, case class, Row).
> We need to add more tests the check for all combinations of input types and 
> schema definition modes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to