Hi! This is because TypeExtractor#getMapReturnTypes are not dealing with row types (see that method and also TypeExtractor#privateGetForClass). You might want to open a JIRA ticket for this.
Matthias Broecheler <matth...@dataeng.ai> 于2021年8月20日周五 上午7:01写道: > Hey Flinkers, > > I am trying to follow the docs > <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api> > to > convert a DataStream to a Table. Specifically, I have a DataStream of Row > and want the columns of the row to become the columns of the resulting > table. > > That works but only if I construct the Rows statically. If I construct > them dynamically (in a map) then Flink turns the entire Row into one column > of type "RAW('org.apache.flink.types.Row', '...')". > > Does anybody know why this is the case or how to fix it? Take a look at > the simple Flink program below where I construct the DataStream "rows" in > two different ways. I would expect those to be identical (and the sink does > print identical information) but the inferred table schema is different. > > Thanks a ton, > Matthias > > ------------------------------ > > StreamExecutionEnvironment flinkEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); > > DataStream<Integer> integers = flinkEnv.fromElements(12, 5); > > DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i)); > > // This alternative way of constructing this data stream produces the > expected table schema > // DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12", 12), > Row.of("Name5", 5)); > > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(flinkEnv); > Table table = tableEnv.fromDataStream(rows); > table.printSchema(); > > rows.addSink(new PrintSinkFunction<>()); > > flinkEnv.execute(); > >