Hi Matthias, Before the bug is fixed, you could specify the return type explicitly in the second parameter of the map function.
DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i)); -> DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i), new RowTypeInfo(Types.STRING, Types.INT)); Best, JING ZHANG Matthias Broecheler <matth...@dataeng.ai> 于2021年8月21日周六 上午12:40写道: > Thank you, Caizhi, for looking into this and identifying the source of the > bug. Is there a way to work around this at the API level until this bug is > resolved? Can I somehow "inject" the type? > > Thanks a lot for your help, > Matthias > > On Thu, Aug 19, 2021 at 10:15 PM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi! >> >> I've created a JIRA ticket[1] for this issue. Please check it out and >> track the progress there. >> >> [1] https://issues.apache.org/jira/browse/FLINK-23885 >> >> Caizhi Weng <tsreape...@gmail.com> 于2021年8月20日周五 上午10:47写道: >> >>> Hi! >>> >>> This is because TypeExtractor#getMapReturnTypes are not dealing with row >>> types (see that method and also TypeExtractor#privateGetForClass). You >>> might want to open a JIRA ticket for this. >>> >>> Matthias Broecheler <matth...@dataeng.ai> 于2021年8月20日周五 上午7:01写道: >>> >>>> Hey Flinkers, >>>> >>>> I am trying to follow the docs >>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api> >>>> to >>>> convert a DataStream to a Table. Specifically, I have a DataStream of Row >>>> and want the columns of the row to become the columns of the resulting >>>> table. >>>> >>>> That works but only if I construct the Rows statically. If I construct >>>> them dynamically (in a map) then Flink turns the entire Row into one column >>>> of type "RAW('org.apache.flink.types.Row', '...')". >>>> >>>> Does anybody know why this is the case or how to fix it? Take a look at >>>> the simple Flink program below where I construct the DataStream "rows" in >>>> two different ways. I would expect those to be identical (and the sink does >>>> print identical information) but the inferred table schema is different. >>>> >>>> Thanks a ton, >>>> Matthias >>>> >>>> ------------------------------ >>>> >>>> StreamExecutionEnvironment flinkEnv = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); >>>> >>>> DataStream<Integer> integers = flinkEnv.fromElements(12, 5); >>>> >>>> DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i)); >>>> >>>> // This alternative way of constructing this data stream produces the >>>> expected table schema >>>> // DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12", 12), >>>> Row.of("Name5", 5)); >>>> >>>> StreamTableEnvironment tableEnv = >>>> StreamTableEnvironment.create(flinkEnv); >>>> Table table = tableEnv.fromDataStream(rows); >>>> table.printSchema(); >>>> >>>> rows.addSink(new PrintSinkFunction<>()); >>>> >>>> flinkEnv.execute(); >>>> >>>>