Hi!

This is because TypeExtractor#getMapReturnTypes are not dealing with row
types (see that method and also TypeExtractor#privateGetForClass). You
might want to open a JIRA ticket for this.

Matthias Broecheler <matth...@dataeng.ai> 于2021年8月20日周五 上午7:01写道:

> Hey Flinkers,
>
> I am trying to follow the docs
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api>
>  to
> convert a DataStream to a Table. Specifically, I have a DataStream of Row
> and want the columns of the row to become the columns of the resulting
> table.
>
> That works but only if I construct the Rows statically. If I construct
> them dynamically (in a map) then Flink turns the entire Row into one column
> of type "RAW('org.apache.flink.types.Row', '...')".
>
> Does anybody know why this is the case or how to fix it? Take a look at
> the simple Flink program below where I construct the DataStream "rows" in
> two different ways. I would expect those to be identical (and the sink does
> print identical information) but the inferred table schema is different.
>
> Thanks a ton,
> Matthias
>
> ------------------------------
>
>         StreamExecutionEnvironment flinkEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>
>         DataStream<Integer> integers = flinkEnv.fromElements(12, 5);
>
>         DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i));
>
> //  This alternative way of constructing this data stream produces the 
> expected table schema
> //      DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12", 12), 
> Row.of("Name5", 5));
>
>         StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(flinkEnv);
>         Table table = tableEnv.fromDataStream(rows);
>         table.printSchema();
>
>         rows.addSink(new PrintSinkFunction<>());
>
>         flinkEnv.execute();
>
>

Reply via email to