[ https://issues.apache.org/jira/browse/FLINK-23865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404212#comment-17404212 ]
zoucao edited comment on FLINK-23865 at 8/25/21, 1:29 PM: ---------------------------------------------------------- Recently, I use the new interface proposed by [FLIP-136|https://issues.apache.org/jira/browse/FLINK-19976] to test interoperability between Table API and DataStream, I found it could cover most scenarios,but did not work well for nested pojo either. In new interface, we use 'SchemaTranslator' to help to reorder fields for pojo. For example, if the we want to convert Table to DataStream<SimplePojo>, and the type for upstream Table is *<b STRING, a INT>*, the class SimplePojo is {code:java} public static class SimplePojo { public String b; public int a; } {code} Flink will reoder SimplePojo's fields by alphabet order, the type of SimplePojo will be *<a INT, b STRING>*, then flink will get the projections, generate a new schema for Table and do a conversion. It's simple and neat, but it can not reorder the fields for a nested Pojo. For example, we want to convert Table to DataStream<NestedPojo>, and the type for upstream Table is *<b ROW<d STRING, c INT>, a INT>*, the NestedPojo is {code:java} public static class NestedPojo { public Inner b; public int a; } public static class Inner { d STRING; c INT; } {code} The type of NestedPojo will be *<a INT, b Structured<c INT, d STRING>>*, but we only convert Table type to *<a INT, b ROW<d STRING, c INT>>* Is this an known problem? cc [~twalthr],[~jark] was (Author: zoucao): Recently, I use the new interface proposed by [FLIP-136|https://issues.apache.org/jira/browse/FLINK-19976] to test interoperability between Table API and DataStream, I found it could cover most scenarios,but did not work well for nested pojo either. In new interface, we use 'SchemaTranslator' to help to reorder fields for pojo. For example, if the we want to convert Table to DataStream<SimplePojo>, and the type for upstream Table is *<b STRING, a INT>*, the class SimplePojo is {code:java} public static class SimplePojo { public String b; public int a; } {code} Flink will reoder SimplePojo's fields by alphabet order, the type of SimplePojo will be *<a INT, b STRING>*, then flink will get the projections, generate a new schema for Table and do a conversion. It's simple and neat, but it can not reorder the fields for a nested Pojo. For example, we want to convert Table to DataStream<NestedPojo>, and the type for upstream Table is *<b ROW<d STRING, c INT>, a INT>*, the NestedPojo is {code:java} public static class NestedPojo { public Inner b; public int a; } public static class Inner { d STRING; c INT; } {code} The type of NestedPojo will be *<a INT, b Structured<c INT, d STRING>>*, but we only convert Table type to *<a INT, b ROW<d STRING, c INT>>* Is this an known problem? cc [~tiwalter],[~jark] > Class cast error caused by nested Pojo in legacy outputConversion > ----------------------------------------------------------------- > > Key: FLINK-23865 > URL: https://issues.apache.org/jira/browse/FLINK-23865 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.13.2 > Reporter: zoucao > Priority: Major > > code: > {code:java} > Table table = tbEnv.fromValues(DataTypes.ROW( > DataTypes.FIELD("innerPojo", DataTypes.ROW(DataTypes.FIELD("c", > STRING()))), > DataTypes.FIELD("b", STRING()), > DataTypes.FIELD("a", INT())), > Row.of(Row.of("str-c"), "str-b", 1)); > DataStream<Pojo> pojoDataStream = tbEnv.toAppendStream(table, Pojo.class); > ----------------------------- > public static class Pojo{ > public InnerPojo innerPojo; > public String b; > public int a; > public Pojo() { > } > } > public static class InnerPojo { > public String c; > public InnerPojo() { > } > }{code} > error: > {code:java} > java.lang.ClassCastException: org.apache.flink.table.types.logical.IntType > cannot be cast to org.apache.flink.table.types.logical.RowType > java.lang.ClassCastException: org.apache.flink.table.types.logical.IntType > cannot be cast to org.apache.flink.table.types.logical.RowType at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:163) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:155) > {code} > The fields of PojoTypeInfo is in the alphabet order, such that in > `expandPojoTypeToSchema`, 'pojoType' and 'queryLogicalType' should have own > index,but now we use the pojo field index to get 'queryLogicalType', this > will casue the field type mismatch. It should be fixed like : > {code:java} > val queryIndex = queryLogicalType.getFieldIndex(name) > val nestedLogicalType = > queryLogicalType.getFields()(queryIndex).getType.asInstanceOf[RowType]{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)