Perfect, that worked.

Thanks a lot, JING!

On Sun, Aug 22, 2021 at 1:25 AM JING ZHANG <beyond1...@gmail.com> wrote:

> Hi Matthias,
> Before the bug is fixed, you could specify the return type explicitly in
> the second parameter of the map function.
>
> DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i));   ->
>
> DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i), new 
> RowTypeInfo(Types.STRING, Types.INT));
>
> Best,
> JING ZHANG
>
>
>
> Matthias Broecheler <matth...@dataeng.ai> 于2021年8月21日周六 上午12:40写道:
>
>> Thank you, Caizhi, for looking into this and identifying the source of
>> the bug. Is there a way to work around this at the API level until this bug
>> is resolved? Can I somehow "inject" the type?
>>
>> Thanks a lot for your help,
>> Matthias
>>
>> On Thu, Aug 19, 2021 at 10:15 PM Caizhi Weng <tsreape...@gmail.com>
>> wrote:
>>
>>> Hi!
>>>
>>> I've created a JIRA ticket[1] for this issue. Please check it out and
>>> track the progress there.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23885
>>>
>>> Caizhi Weng <tsreape...@gmail.com> 于2021年8月20日周五 上午10:47写道:
>>>
>>>> Hi!
>>>>
>>>> This is because TypeExtractor#getMapReturnTypes are not dealing with
>>>> row types (see that method and also TypeExtractor#privateGetForClass). You
>>>> might want to open a JIRA ticket for this.
>>>>
>>>> Matthias Broecheler <matth...@dataeng.ai> 于2021年8月20日周五 上午7:01写道:
>>>>
>>>>> Hey Flinkers,
>>>>>
>>>>> I am trying to follow the docs
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api>
>>>>>  to
>>>>> convert a DataStream to a Table. Specifically, I have a DataStream of Row
>>>>> and want the columns of the row to become the columns of the resulting
>>>>> table.
>>>>>
>>>>> That works but only if I construct the Rows statically. If I construct
>>>>> them dynamically (in a map) then Flink turns the entire Row into one 
>>>>> column
>>>>> of type "RAW('org.apache.flink.types.Row', '...')".
>>>>>
>>>>> Does anybody know why this is the case or how to fix it? Take a look
>>>>> at the simple Flink program below where I construct the DataStream "rows"
>>>>> in two different ways. I would expect those to be identical (and the sink
>>>>> does print identical information) but the inferred table schema is
>>>>> different.
>>>>>
>>>>> Thanks a ton,
>>>>> Matthias
>>>>>
>>>>> ------------------------------
>>>>>
>>>>>         StreamExecutionEnvironment flinkEnv = 
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>         flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>>>
>>>>>         DataStream<Integer> integers = flinkEnv.fromElements(12, 5);
>>>>>
>>>>>         DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i));
>>>>>
>>>>> //  This alternative way of constructing this data stream produces the 
>>>>> expected table schema
>>>>> //      DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12", 
>>>>> 12), Row.of("Name5", 5));
>>>>>
>>>>>         StreamTableEnvironment tableEnv = 
>>>>> StreamTableEnvironment.create(flinkEnv);
>>>>>         Table table = tableEnv.fromDataStream(rows);
>>>>>         table.printSchema();
>>>>>
>>>>>         rows.addSink(new PrintSinkFunction<>());
>>>>>
>>>>>         flinkEnv.execute();
>>>>>
>>>>>

Reply via email to