[ 
https://issues.apache.org/jira/browse/FLINK-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16997176#comment-16997176
 ] 

Jark Wu commented on FLINK-15283:
---------------------------------

Hi [~rolandWang], actually, this is not a bug of Flink. In SQL world, when 
inserting a query into a table, the mapping from query fields to sink fields is 
not by column names, but by the order of fields.  That is because the column 
names of query may be missing and hard to keep the same with sinks. 

> Scala version of TableSinkUtils has a problem when validating sinks.
> --------------------------------------------------------------------
>
>                 Key: FLINK-15283
>                 URL: https://issues.apache.org/jira/browse/FLINK-15283
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Scala
>    Affects Versions: 1.9.0
>         Environment: All environments of flink 1.9.0
>            Reporter: roland wang
>            Priority: Major
>
> *1. Phenomenon*
> I created a kafka sink with the schema like :
> {code:java}
> [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
> {code}
> When I tried to insert some data into this sink, an error occurs as follows:
> {code:java}
> Caused by: org.apache.flink.table.api.ValidationException: Field types of 
> query result and registered TableSink [TEST_SINK] do not match. Query result 
> schema: [ORDER_NO: String, BAK_NO: String, TRANS_AMT: Double] TableSink 
> schema: [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
> {code}
>  ** Now I have to keep the order of the query schema absolutely as the sink's 
> schema, which causes a lot of trouble.
> *2. Cause*
> I checked the code and found this line :
> {code:java}
> // validate schema of source table and table sink
> val srcFieldTypes = query.getTableSchema.getFieldDataTypes
> val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes
> if (srcFieldTypes.length != sinkFieldTypes.length ||
>   srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) =>
>     !PlannerTypeUtils.isInteroperable(
>       fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF))
>   }) {
> ...{code}
> I sink when they try to compare the sink's schma to query's schema, the zip 
> code goes wrong because they forget to sort both of the schema.
> I trully hope this bug could be fixed soon.
> Thanks for all your hard work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to