[ 
https://issues.apache.org/jira/browse/FLINK-23865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404212#comment-17404212
 ] 

zoucao commented on FLINK-23865:
--------------------------------

Recently, I use the new interface proposed by 
[FLIP-136|https://issues.apache.org/jira/browse/FLINK-19976] to test 
interoperability between Table API and DataStream, I found it could cover most 
scenarios,but did not work well for nested pojo either. In new interface, we 
use 'SchemaTranslator' to help to reorder fields for pojo.
For example, if the we want to convert Table to DataStream<SimplePojo>, and the 
type for upstream Table is *<b STRING, a INT>*, the class SimplePojo is 
{code:java}
public static class SimplePojo {
    public String b;
    public int a;
}
{code}
Flink will reoder SimplePojo's fields by alphabet order, the type of SimplePojo 
will be *<a INT, b STRING>*, then flink will get the projections, generate a 
new schema for Table and do a conversion. It's simple and neat, but it can not 
reorder the fields for a nested Pojo. 
For example,  we want to convert Table to DataStream<NestedPojo>, and the type 
for upstream Table is *<b ROW<d STRING, c INT>, a INT>*, the NestedPojo is 
{code:java}
public static class NestedPojo {
    public Inner b;
    public int a;
}
public static class Inner {
  d STRING;
  c INT;
}
{code}
The type of NestedPojo will be *<a INT, b Structured<c INT, d STRING>>*, but we 
only convert Table type to *<a INT, b ROW<d STRING, c INT>>*
Is this an known problem?
cc [~tiwalter],[~jark]

> Class cast error caused by nested Pojo in legacy outputConversion
> -----------------------------------------------------------------
>
>                 Key: FLINK-23865
>                 URL: https://issues.apache.org/jira/browse/FLINK-23865
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.2
>            Reporter: zoucao
>            Priority: Major
>
> code:
> {code:java}
> Table table = tbEnv.fromValues(DataTypes.ROW(
>         DataTypes.FIELD("innerPojo", DataTypes.ROW(DataTypes.FIELD("c", 
> STRING()))),
>         DataTypes.FIELD("b", STRING()),
>         DataTypes.FIELD("a", INT())),
>         Row.of(Row.of("str-c"), "str-b", 1));
> DataStream<Pojo> pojoDataStream = tbEnv.toAppendStream(table, Pojo.class);
> -----------------------------
> public static class Pojo{
>     public InnerPojo innerPojo;
>     public String b;
>     public int a;
>     public Pojo() {
>     }
> }
> public static class InnerPojo {
>     public String c;
>     public InnerPojo() {
>     }
> }{code}
> error:
> {code:java}
> java.lang.ClassCastException: org.apache.flink.table.types.logical.IntType 
> cannot be cast to org.apache.flink.table.types.logical.RowType
> java.lang.ClassCastException: org.apache.flink.table.types.logical.IntType 
> cannot be cast to org.apache.flink.table.types.logical.RowType at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:163)
>  at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:155)
> {code}
> The fields of PojoTypeInfo is in the alphabet order, such that in 
> `expandPojoTypeToSchema`, 'pojoType' and 'queryLogicalType' should have own 
> index,but now we use the pojo field index to get 'queryLogicalType', this 
> will casue the field type mismatch. It should be fixed like :
> {code:java}
> val queryIndex = queryLogicalType.getFieldIndex(name)
> val nestedLogicalType = 
> queryLogicalType.getFields()(queryIndex).getType.asInstanceOf[RowType]{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to