Hi Matthias,
Before the bug is fixed, you could specify the return type explicitly in
the second parameter of the map function.

DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i));   ->

DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i), new
RowTypeInfo(Types.STRING, Types.INT));

Best,
JING ZHANG



Matthias Broecheler <matth...@dataeng.ai> 于2021年8月21日周六 上午12:40写道:

> Thank you, Caizhi, for looking into this and identifying the source of the
> bug. Is there a way to work around this at the API level until this bug is
> resolved? Can I somehow "inject" the type?
>
> Thanks a lot for your help,
> Matthias
>
> On Thu, Aug 19, 2021 at 10:15 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>
>> Hi!
>>
>> I've created a JIRA ticket[1] for this issue. Please check it out and
>> track the progress there.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-23885
>>
>> Caizhi Weng <tsreape...@gmail.com> 于2021年8月20日周五 上午10:47写道:
>>
>>> Hi!
>>>
>>> This is because TypeExtractor#getMapReturnTypes are not dealing with row
>>> types (see that method and also TypeExtractor#privateGetForClass). You
>>> might want to open a JIRA ticket for this.
>>>
>>> Matthias Broecheler <matth...@dataeng.ai> 于2021年8月20日周五 上午7:01写道:
>>>
>>>> Hey Flinkers,
>>>>
>>>> I am trying to follow the docs
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api>
>>>>  to
>>>> convert a DataStream to a Table. Specifically, I have a DataStream of Row
>>>> and want the columns of the row to become the columns of the resulting
>>>> table.
>>>>
>>>> That works but only if I construct the Rows statically. If I construct
>>>> them dynamically (in a map) then Flink turns the entire Row into one column
>>>> of type "RAW('org.apache.flink.types.Row', '...')".
>>>>
>>>> Does anybody know why this is the case or how to fix it? Take a look at
>>>> the simple Flink program below where I construct the DataStream "rows" in
>>>> two different ways. I would expect those to be identical (and the sink does
>>>> print identical information) but the inferred table schema is different.
>>>>
>>>> Thanks a ton,
>>>> Matthias
>>>>
>>>> ------------------------------
>>>>
>>>>         StreamExecutionEnvironment flinkEnv = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>         flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>>
>>>>         DataStream<Integer> integers = flinkEnv.fromElements(12, 5);
>>>>
>>>>         DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i));
>>>>
>>>> //  This alternative way of constructing this data stream produces the 
>>>> expected table schema
>>>> //      DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12", 12), 
>>>> Row.of("Name5", 5));
>>>>
>>>>         StreamTableEnvironment tableEnv = 
>>>> StreamTableEnvironment.create(flinkEnv);
>>>>         Table table = tableEnv.fromDataStream(rows);
>>>>         table.printSchema();
>>>>
>>>>         rows.addSink(new PrintSinkFunction<>());
>>>>
>>>>         flinkEnv.execute();
>>>>
>>>>

Reply via email to