[ https://issues.apache.org/jira/browse/FLINK-21247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu updated FLINK-21247: ---------------------------- Fix Version/s: 1.13.0 1.12.2 > flink iceberg table map<string,string> cannot convert to datastream > ------------------------------------------------------------------- > > Key: FLINK-21247 > URL: https://issues.apache.org/jira/browse/FLINK-21247 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Ecosystem > Affects Versions: 1.12.1 > Environment: iceberg master > flink 1.12 > > > Reporter: donglei > Priority: Major > Labels: pull-request-available > Fix For: 1.12.2, 1.13.0 > > Attachments: image-2021-02-03-15-38-42-340.png, > image-2021-02-03-15-40-27-055.png, image-2021-02-03-15-41-34-426.png, > image-2021-02-03-15-43-19-919.png, image-2021-02-03-15-52-12-493.png, > image-2021-02-03-15-53-18-244.png > > > Flink Iceberg Table with map<string,string> > !image-2021-02-03-15-38-42-340.png! > > we want to read the table like this : > > String querySql = "SELECT > ftime,extinfo,country,province,operator,apn,gw,src_ip_head,info_str,product_id,app_version,sdk_id,sdk_version,hardware_os,qua,upload_ip,client_ip,upload_apn,event_code,event_result,package_size,consume_time,event_value,event_time,upload_time,boundle_id,uin,platform,os_version,channel,brand,model > from bfzt3 "; > Table table = tEnv.sqlQuery(querySql); > DataStream<AttaInfo> sinkStream = tEnv.toAppendStream(table, > Types.POJO(AttaInfo.class, map)); > sinkStream.map(x->1).returns(Types.INT).keyBy(new > NullByteKeySelector<Integer>()).reduce((x,y) -> { > return x+y; > }).print(); > > > when read we find a exception > > 2021-02-03 15:37:57 > java.lang.ClassCastException: > org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableMapData cannot be > cast to org.apache.flink.table.data.binary.BinaryMapData > at > org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:107) > at > org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:47) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:166) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:129) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411) > at > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) > > we find that iceberg map is ReusableMapData implements MapData > !image-2021-02-03-15-40-27-055.png! > > this is the exception > !image-2021-02-03-15-41-34-426.png! > MapData has two default implements GenericMapData and BinaryMapData > from iceberg implement is ReusableMapData > > so i think that code should change to like this > !image-2021-02-03-15-43-19-919.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)