[ https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313209#comment-16313209 ]
ASF GitHub Bot commented on FLINK-8203: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5132#discussion_r159876208 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -446,52 +443,60 @@ abstract class StreamTableEnvironment( * Checks for at most one rowtime and proctime attribute. * Returns the time attributes. * - * @param isReferenceByPosition schema mode see [[isReferenceByPosition()]] - * * @return rowtime attribute and proctime attribute */ private def validateAndExtractTimeAttributes( - isReferenceByPosition: Boolean, streamType: TypeInformation[_], exprs: Array[Expression]) : (Option[(Int, String)], Option[(Int, String)]) = { - val fieldTypes: Array[TypeInformation[_]] = streamType match { - case c: CompositeType[_] => (0 until c.getArity).map(i => c.getTypeAt(i)).toArray - case t: TypeInformation[_] => Array(t) + val (isRefByPos, fieldTypes) = streamType match { + case c: CompositeType[_] => + // determine schema definition mode (by position or by name) + (isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => c.getTypeAt(i)).toArray) + case t: TypeInformation[_] => + (false, Array(t)) } var fieldNames: List[String] = Nil var rowtime: Option[(Int, String)] = None var proctime: Option[(Int, String)] = None + def checkRowtimeType(t: TypeInformation[_]): Unit = { + if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) { + throw new TableException( + s"The rowtime attribute can only replace a field with a valid time type, " + + s"such as Timestamp or Long. But was: $t") + } + } + def extractRowtime(idx: Int, name: String, origName: Option[String]): Unit = { if (rowtime.isDefined) { throw new TableException( "The rowtime attribute can only be defined once in a table schema.") } else { // if the fields are referenced by position, // it is possible to replace an existing field or append the time attribute at the end - if (isReferenceByPosition) { - - val mappedIdx = streamType match { - case pti: PojoTypeInfo[_] => - pti.getFieldIndex(origName.getOrElse(name)) - case _ => idx; - } - + if (isRefByPos) { // check type of field that is replaced - if (mappedIdx < 0) { + if (idx < 0) { throw new TableException( s"The rowtime attribute can only replace a valid field. " + s"${origName.getOrElse(name)} is not a field of type $streamType.") } - else if (mappedIdx < fieldTypes.length && - !(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) || - TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx)))) { - throw new TableException( - s"The rowtime attribute can only replace a field with a valid time type, " + - s"such as Timestamp or Long. But was: ${fieldTypes(mappedIdx)}") + else if (idx < fieldTypes.length) { + checkRowtimeType(fieldTypes(idx)) + } + } + // check for valid alias if referenced by name + else if (origName.isDefined) { + // check for valid alias + streamType match { + case ct: CompositeType[_] if ct.hasField(origName.get) => + val t = ct.getTypeAt(ct.getFieldIndex(origName.get)) + checkRowtimeType(t) + case _ => + throw new TableException("An alias must always reference an existing field.") --- End diff -- Add `origName` (and the existing fields?) to the error message. > Make schema definition of DataStream/DataSet to Table conversion more flexible > ------------------------------------------------------------------------------ > > Key: FLINK-8203 > URL: https://issues.apache.org/jira/browse/FLINK-8203 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.4.0, 1.5.0 > Reporter: Fabian Hueske > Assignee: Timo Walther > > When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, > the schema of the table can be defined (by default it is extracted from the > {{TypeInformation}}. > The schema needs to be manually specified to select (project) fields, rename > fields, or define time attributes. Right now, there are several limitations > how the fields can be defined that also depend on the type of the > {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., > tuples, case classes, Row) require schema definition based on the position of > fields. Pojo types which have no fixed order of fields, require to refer to > fields by name. Moreover, there are several restrictions on how time > attributes can be defined, e.g., event time attribute must replace an > existing field or be appended and proctime attributes must be appended. > I think we can make the schema definition more flexible and provide two modes: > 1. Reference input fields by name: All fields in the schema definition are > referenced by name (and possibly renamed using an alias ({{as}}). In this > mode, fields can be reordered and projected out. Moreover, we can define > proctime and eventtime attributes at arbitrary positions using arbitrary > names (except those that existing the result schema). This mode can be used > for any input type, including POJOs. This mode is used if all field > references exist in the input type. > 2. Reference input fields by position: Field references might not refer to > existing fields in the input type. In this mode, fields are simply renamed. > Event-time attributes can replace the field on their position in the input > data (if it is of correct type) or be appended at the end. Proctime > attributes must be appended at the end. This mode can only be used if the > input type has a defined field order (tuple, case class, Row). > We need to add more tests the check for all combinations of input types and > schema definition modes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)