[ https://issues.apache.org/jira/browse/FLINK-5662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15863047#comment-15863047 ]
ASF GitHub Bot commented on FLINK-5662: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100708516 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -428,6 +431,113 @@ abstract class TableEnvironment(val config: TableConfig) { (fieldNames.toArray, fieldIndexes.toArray) } + /** + * Creates a final converter that maps the internal row type to external type. + */ + protected def sinkConversion[T]( + physicalRowTypeInfo: TypeInformation[Row], + logicalRowType: RelDataType, + expectedTypeInfo: TypeInformation[T], + functionName: String) + : Option[MapFunction[Row, T]] = { + + // validate that at least the field types of physical and logical type match + // we do that here to make sure that plan translation was correct + val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType) + if (physicalRowTypeInfo != logicalRowTypeInfo) { + throw TableException("The field types of physical and logical row types do not match." + + "This is a bug and should not happen. Please file an issue.") + } + + // expected type is a row, no conversion needed + // TODO this logic will change with FLINK-5429 + if (expectedTypeInfo.getTypeClass == classOf[Row]) { + return None + } + + // convert to type information + val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType => + FlinkTypeFactory.toTypeInfo(relDataType.getType) + } + // field names + val logicalFieldNames = logicalRowType.getFieldNames.asScala + + // validate expected type + if (expectedTypeInfo.getArity != logicalFieldTypes.length) { + throw new TableException("Arity of result does not match expected type.") + } + expectedTypeInfo match { + + // POJO type expected + case pt: PojoTypeInfo[_] => + logicalFieldNames.zip(logicalFieldTypes) foreach { + case (fName, fType) => + val pojoIdx = pt.getFieldIndex(fName) + if (pojoIdx < 0) { + throw new TableException(s"POJO does not define field name: $fName") + } + val expectedTypeInfo = pt.getTypeAt(pojoIdx) + if (fType != expectedTypeInfo) { + throw new TableException(s"Result field does not match expected type. " + + s"Expected: $expectedTypeInfo; Actual: $fType") + } + } + + // Tuple/Case class type expected + case ct: CompositeType[_] => + logicalFieldTypes.zipWithIndex foreach { + case (fieldTypeInfo, i) => + val expectedTypeInfo = ct.getTypeAt(i) + if (fieldTypeInfo != expectedTypeInfo) { + throw new TableException(s"Result field does not match expected type. " + + s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo") + } + } + + // Atomic type expected + case at: AtomicType[_] => + val fieldTypeInfo = logicalFieldTypes.head --- End diff -- Add a check that `logicalFieldTypes.size() == 1` > Alias in front of output fails > ------------------------------ > > Key: FLINK-5662 > URL: https://issues.apache.org/jira/browse/FLINK-5662 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.2.0 > Reporter: Timo Walther > Assignee: Timo Walther > > Neither of the following snippets works: > {code} > public static void main(String[] args) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet<String> text = ... > BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); > // Table t = tEnv.fromDataSet(text, "text").select("text AS line"); > // Table t = tEnv.fromDataSet(text, "text").as("line"); > Table t = tEnv.fromDataSet(text, "text").select("text AS > line").select("line AS line"); > tEnv.toDataSet(t, MyPojo.class).print(); > } > public static class MyPojo { > public String line; > } > {code} > {code} > Exception in thread "main" org.apache.flink.table.api.TableException: POJO > does not define field name: text > at > org.apache.flink.table.typeutils.TypeConverter$$anonfun$2.apply(TypeConverter.scala:85) > at > org.apache.flink.table.typeutils.TypeConverter$$anonfun$2.apply(TypeConverter.scala:81) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.flink.table.typeutils.TypeConverter$.determineReturnType(TypeConverter.scala:81) > at > org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToExpectedType(BatchScan.scala:69) > at > org.apache.flink.table.plan.nodes.dataset.DataSetScan.translateToPlan(DataSetScan.scala:61) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:305) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:289) > at > org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146) > at groupId.WordCount.main(WordCount.java:67) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)