[ https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16312188#comment-16312188 ]
ASF GitHub Bot commented on FLINK-8203: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5132#discussion_r159767413 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -791,92 +824,109 @@ abstract class TableEnvironment(val config: TableConfig) { * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of * [[Expression]]. It does not handle time attributes but considers them in indices. * + * @param isReferenceByPosition schema mode see [[isReferenceByPosition()]] * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated. * @param exprs The expressions that define the field names. * @tparam A The type of the TypeInformation. * @return A tuple of two arrays holding the field names and corresponding field positions. */ - protected[flink] def getFieldInfo[A]( + protected def getFieldInfo[A]( + isReferenceByPosition: Boolean, inputType: TypeInformation[A], exprs: Array[Expression]) : (Array[String], Array[Int]) = { TableEnvironment.validateType(inputType) + def referenceByName(name: String, ct: CompositeType[_]): Option[(Int, String)] = { + val inputIdx = ct.getFieldIndex(name) + if (inputIdx < 0) { + throw new TableException(s"$name is not a field of type $ct. " + + s"Expected: ${ct.getFieldNames.mkString(", ")}") + } else { + Some((inputIdx, name)) + } + } + val indexedNames: Array[(Int, String)] = inputType match { + case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] => throw new TableException( "An input of GenericTypeInfo<Row> cannot be converted to Table. " + "Please specify the type of the input with a RowTypeInfo.") - case a: AtomicType[_] => - exprs.zipWithIndex flatMap { - case (_: TimeAttribute, _) => - None - case (UnresolvedFieldReference(name), idx) if idx > 0 => - // only accept the first field for an atomic type - throw new TableException("Only the first field can reference an atomic type.") - case (UnresolvedFieldReference(name), idx) => - // first field reference is mapped to atomic type - Some((0, name)) - case _ => throw new TableException("Field reference expression requested.") - } + case t: TupleTypeInfo[A] => exprs.zipWithIndex flatMap { - case (UnresolvedFieldReference(name), idx) => - Some((idx, name)) - case (Alias(UnresolvedFieldReference(origName), name, _), _) => + case (UnresolvedFieldReference(name: String), idx) => + if (isReferenceByPosition) { + Some((idx, name)) + } else { + referenceByName(name, t) + } + case (Alias(UnresolvedFieldReference(origName), name: String, _), _) => val idx = t.getFieldIndex(origName) if (idx < 0) { - throw new TableException(s"$origName is not a field of type $t") + throw new TableException(s"$origName is not a field of type $t. " + + s"Expected: ${t.getFieldNames.mkString(", ")}") } Some((idx, name)) case (_: TimeAttribute, _) => None case _ => throw new TableException( "Field reference expression or alias on field expression expected.") } + case c: CaseClassTypeInfo[A] => --- End diff -- Can we merge the cases of `TupleTypeInfo`, `CaseClassTypeInfo`, and `RowTypeInfo` by checking for ``` case t: TupleTypeInfoBase[A] if t.isInstanceOf[TupleTypeInfo[A]] || t.isInstanceOf[CaseClassTypeInfo[A]] || t.isInstanceOf[RowTypeInfo] => ``` > Make schema definition of DataStream/DataSet to Table conversion more flexible > ------------------------------------------------------------------------------ > > Key: FLINK-8203 > URL: https://issues.apache.org/jira/browse/FLINK-8203 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.4.0, 1.5.0 > Reporter: Fabian Hueske > Assignee: Timo Walther > > When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, > the schema of the table can be defined (by default it is extracted from the > {{TypeInformation}}. > The schema needs to be manually specified to select (project) fields, rename > fields, or define time attributes. Right now, there are several limitations > how the fields can be defined that also depend on the type of the > {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., > tuples, case classes, Row) require schema definition based on the position of > fields. Pojo types which have no fixed order of fields, require to refer to > fields by name. Moreover, there are several restrictions on how time > attributes can be defined, e.g., event time attribute must replace an > existing field or be appended and proctime attributes must be appended. > I think we can make the schema definition more flexible and provide two modes: > 1. Reference input fields by name: All fields in the schema definition are > referenced by name (and possibly renamed using an alias ({{as}}). In this > mode, fields can be reordered and projected out. Moreover, we can define > proctime and eventtime attributes at arbitrary positions using arbitrary > names (except those that existing the result schema). This mode can be used > for any input type, including POJOs. This mode is used if all field > references exist in the input type. > 2. Reference input fields by position: Field references might not refer to > existing fields in the input type. In this mode, fields are simply renamed. > Event-time attributes can replace the field on their position in the input > data (if it is of correct type) or be appended at the end. Proctime > attributes must be appended at the end. This mode can only be used if the > input type has a defined field order (tuple, case class, Row). > We need to add more tests the check for all combinations of input types and > schema definition modes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)