[ https://issues.apache.org/jira/browse/FLINK-20747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17254421#comment-17254421 ]
zengjinbo commented on FLINK-20747: ----------------------------------- hi jark: this is Exception code ,but I Can't provide data. {code:java} // create table dws_fact_day_org_pro_size_ord( order_no string, order_dtl_id string, brd_no string, brd_dtl_no string, org_lno string, store_lno string, store_brd string, org_new_no string, period_sdate string, create_time string, pro_no string, size_code string, size_type_code string, cust_no string, sal_biz_type string, sal_mode_cate string, sal_mode string, sys_source int, logistics_mode int, sal_qty int, sal_amt decimal(38, 18), sal_nos_prm_amt decimal(38, 18), sal_prm_amt decimal(38, 18), table_source int, status int, order_type int, tag_price decimal(38, 18), pro_prm decimal(38, 18), mem_no string, mem_name string, mem_tel string, discount_rate_type int, prom_no string, discount_rate_source_id string, discount_rate decimal(38, 18), is_online int, virtual_flag int, proctime as proctime(), primary key(order_no,order_dtl_id)not enforced)WITH ( 'connector' = 'upsert-kafka', 'topic' = 'TOPIC_DWS_FACT_DAY_ORG_PRO_SIZE_ORD', 'properties.group.id' = 'DWS_FACT_DAY_ORDER_KAFKA', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'key.format' = 'json', 'value.format' = 'json' ); create table DWS_DIM_ORG_ATTR( org_lno string , multi_type string , store_brd string) with ( 'connector' = 'dmp-elasticsearch','hosts' = 'http://127.0.0.1:9200','index' = 'dws_dim_org_attr'); --dim_产品维表create table DWS_DIM_PRO_ALLINFO_CATE_FLAG(pro_cate_flag int,pro_no string) with ( 'connector' = 'dmp-elasticsearch','hosts' = 'http://127.0.0.1:9200','index' = 'dws_dim_pro_allinfo_cate_flag'); --销售订单明细-营业员 create table T04_POS_ORDER_ASSISTANT ( id string , order_no string , order_dtl_id string , assistant_id string , assistant_no string , assistant_name string , settle_share_amount decimal(38,18) , share_amount decimal(38,18) , share_qty decimal(38,18) , counts int , order_type int , zone_yyyymm string , create_user string , create_time string , update_user string , update_time string , sharding_flag string , assistant_shop_no string , assistant_shop_name string , etl_create_time string , etl_update_time string , proctime as proctime(), primary key(id)not enforced )WITH ( 'connector' = 'upsert-kafka', 'topic' = 'TOPIC_POS_ORDER_ASSISTANT', 'properties.group.id' = 'DWS_FACT_DAY_ORDER_KAFKA', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'key.format' = 'json', 'value.format' = 'json' ); create table topic_dws_fact_day_order ( org_lno string , -- 机构原编码 store_lno string , -- 店仓原编码 period_sdate string , -- 字符串日期(YYYYMMDD) order_no string , -- 订单编号 upt_flag int , -- UPT标识(1:是,0:否) cr_flag int , -- CR标识(1:是,0:否) nos_num_flag int , -- 小票数标识(1:是,0:否) store_brd string , -- 店铺品牌 org_new_no string , -- 机构最新编码(新店店铺编码/货管单位编码) cust_sal_nos_qty decimal(38,18) , -- 客单销售量 cust_sal_nos_amt decimal(38,18) , -- 客单销售额 cust_sal_nos_prm_amt decimal(38,18) , -- 客单销售单据牌价额 sal_nos_num int , -- 客单数 create_time string , -- 制单日期 primary key (org_lno,store_lno,period_sdate,order_no) not enforced ) with ( 'connector' = 'upsert-kafka', 'topic' = 'TOPIC_DWS_FACT_DAY_ORDER', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'key.format' = 'json', 'value.format' = 'json' ); insert into topic_dws_fact_day_orderselect t2.org_lno,t1.store_lno,t1.period_sdate,t1.order_no, case when max(t2.multi_type) in ('单品','NBA') and t2.store_brd in ('AD','AS','AO','AK','RB') and (sum(t1.sal_qty) <= 5 or sum(t1.sal_amt) <= 3000) then 1 when max(t2.multi_type) in ('单品','NBA') and t2.store_brd in ('AD','AS','AO','AK','RB') and (sum(t1.sal_qty) > 5 and sum(t1.sal_amt) > 3000) then 0 when max(t2.multi_type) in ('单品','NBA') and t2.store_brd in ('NK') then 1 when max(t2.multi_type) in ('单品','NBA') and t2.store_brd not in ('NK','AD','AS','AO','AK','RB') and sum(t1.sal_qty) < 10 then 1 when max(t2.multi_type) in ('单品','NBA') and t2.store_brd not in ('NK','AD','AS','AO','AK','RB') and sum(t1.sal_qty) >= 10 then 0 when max(t2.multi_type)='多品' and sum(t1.sal_qty) < 15 then 1 else 0 end upt_flag, max(case t1.is_online when 1 then 0 else case t1.sys_source when 6 then 0 else case t1.virtual_flag when 0 then 0 else 1 end end end ) cr_flag, 1 as nos_num_flag, t2.store_brd, t1.org_new_no, cast(sum(t1.sal_qty) as decimal(38,18)) as cust_sal_nos_qty, cast(sum(case when pro_cate_flag = 1 then t1.sal_amt else 0 end) as decimal(38,18)) as cust_sal_nos_amt, cast(sum(case when pro_cate_flag =1 then t1.sal_nos_prm_amt else 0 end) as decimal(38,18)) as cust_sal_nos_prm_amt, 1 as sal_nos_num, t1.create_time from dws_fact_day_org_pro_size_ord t1, DWS_DIM_ORG_ATTR for system_time as of t1.proctime as t2, DWS_DIM_PRO_ALLINFO_CATE_FLAG for system_time as of t1.proctime as t3, (select order_no,order_dtl_id from T04_POS_ORDER_ASSISTANT group by order_no,order_dtl_id) t4 where t1.org_lno = t2.org_lno and t1.pro_no = t3.pro_no and t1.order_no = t4.order_no and t1.order_dtl_id = t4.order_dtl_id and t1.table_source=1 and t1.order_type = 0 group by t2.org_lno,t1.store_lno,t1.period_sdate,t1.order_no,t2.store_brd,t1.org_new_no,t1.create_time having sum(sal_qty)<>0 or sum(case pro_cate_flag when 1 then t1.sal_amt else 0 end)<>0; {code} > ClassCastException when using MAX aggregate function > ---------------------------------------------------- > > Key: FLINK-20747 > URL: https://issues.apache.org/jira/browse/FLINK-20747 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.12.0 > Reporter: zengjinbo > Priority: Critical > Fix For: 1.12.0, 1.13.0 > > Attachments: image-2020-12-23-18-04-21-079.png > > > During the process of upgrading 1.12.0, I found that Flink SQL is not > compatible with 1.11.1 > java.lang.ClassCastException: java.lang.Integer cannot be cast to > org.apache.flink.table.data.StringDatajava.lang.ClassCastException: > java.lang.Integer cannot be cast to org.apache.flink.table.data.StringData at > org$apache$flink$table$planner$functions$aggfunctions$MaxWithRetractAggFunction$MaxWithRetractAccumulator$Converter.toInternal(Unknown > Source) at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47) > at > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59) > at GroupAggsHandler$875.getAccumulators(Unknown Source) at > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175) > at > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > > > !image-2020-12-23-18-04-21-079.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)