[ 
https://issues.apache.org/jira/browse/FLINK-20747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17254421#comment-17254421
 ] 

zengjinbo commented on FLINK-20747:
-----------------------------------

hi  jark:

 this is Exception code ,but I Can't provide data.
{code:java}
// create table dws_fact_day_org_pro_size_ord(    order_no                
string,    order_dtl_id            string,    brd_no                  string,   
 brd_dtl_no              string,    org_lno                 string,    
store_lno               string,    store_brd               string,    
org_new_no              string,    period_sdate            string,    
create_time             string,    pro_no                  string,    size_code 
              string,    size_type_code          string,    cust_no             
    string,    sal_biz_type            string,    sal_mode_cate           
string,    sal_mode                string,    sys_source              int,    
logistics_mode          int,    sal_qty                 int,    sal_amt         
        decimal(38, 18),    sal_nos_prm_amt         decimal(38, 18),    
sal_prm_amt             decimal(38, 18),    table_source            int,    
status                  int,    order_type              int,    tag_price       
        decimal(38, 18),    pro_prm                 decimal(38, 18),    mem_no  
                string,    mem_name                string,    mem_tel           
      string,    discount_rate_type      int,    prom_no                 
string,    discount_rate_source_id string,    discount_rate           
decimal(38, 18),    is_online               int,    virtual_flag            
int,   proctime as proctime(),   primary key(order_no,order_dtl_id)not 
enforced)WITH    (  'connector' = 'upsert-kafka',  'topic' = 
'TOPIC_DWS_FACT_DAY_ORG_PRO_SIZE_ORD',  'properties.group.id' = 
'DWS_FACT_DAY_ORDER_KAFKA',  'properties.bootstrap.servers' = '127.0.0.1:9092', 
 'key.format' = 'json',  'value.format' = 'json'    );
create table DWS_DIM_ORG_ATTR( org_lno        string , multi_type     string , 
store_brd      string) with ( 'connector' = 'dmp-elasticsearch','hosts' = 
'http://127.0.0.1:9200','index' = 'dws_dim_org_attr');
--dim_产品维表create table DWS_DIM_PRO_ALLINFO_CATE_FLAG(pro_cate_flag int,pro_no   
     string) with ( 'connector' = 'dmp-elasticsearch','hosts' = 
'http://127.0.0.1:9200','index' = 'dws_dim_pro_allinfo_cate_flag');
--销售订单明细-营业员  create table  T04_POS_ORDER_ASSISTANT ( id                   
string        , order_no             string        , order_dtl_id         
string        , assistant_id         string        , assistant_no         
string        , assistant_name       string        , settle_share_amount  
decimal(38,18) , share_amount         decimal(38,18) , share_qty            
decimal(38,18) , counts               int       , order_type           int      
 , zone_yyyymm          string        , create_user          string        , 
create_time          string        , update_user          string        , 
update_time          string        , sharding_flag        string        , 
assistant_shop_no    string        , assistant_shop_name  string        , 
etl_create_time      string        , etl_update_time      string        , 
proctime as proctime(), primary key(id)not enforced )WITH    (      'connector' 
= 'upsert-kafka',  'topic' = 'TOPIC_POS_ORDER_ASSISTANT',  
'properties.group.id' = 'DWS_FACT_DAY_ORDER_KAFKA',  
'properties.bootstrap.servers' = '127.0.0.1:9092',  'key.format' = 'json',  
'value.format' = 'json'
    );
create table topic_dws_fact_day_order        (  org_lno               string    
    , -- 机构原编码                                store_lno             string      
  , -- 店仓原编码                                period_sdate          string        
, -- 字符串日期(YYYYMMDD)                      order_no              string        , 
-- 订单编号                             upt_flag              int      , -- 
UPT标识(1:是,0:否)                   cr_flag               int      , -- 
CR标识(1:是,0:否)                    nos_num_flag          int      , -- 
小票数标识(1:是,0:否)                  store_brd             string        , -- 店铺品牌   
                          org_new_no            string        , -- 
机构最新编码(新店店铺编码/货管单位编码)   cust_sal_nos_qty      decimal(38,18) , -- 客单销售量         
                   cust_sal_nos_amt      decimal(38,18) , -- 客单销售额              
              cust_sal_nos_prm_amt  decimal(38,18) , -- 客单销售单据牌价额               
         sal_nos_num           int        , -- 客单数                              
    create_time           string     ,    -- 制单日期  primary key 
(org_lno,store_lno,period_sdate,order_no) not enforced                          
                 )   with (                'connector' = 'upsert-kafka',  
'topic' = 'TOPIC_DWS_FACT_DAY_ORDER',  'properties.bootstrap.servers' = 
'127.0.0.1:9092',  'key.format' = 'json',  'value.format' = 'json'          );
insert into  topic_dws_fact_day_orderselect    
t2.org_lno,t1.store_lno,t1.period_sdate,t1.order_no,  case            when 
max(t2.multi_type) in ('单品','NBA') and t2.store_brd in 
('AD','AS','AO','AK','RB') and (sum(t1.sal_qty) <= 5 or sum(t1.sal_amt) <= 
3000) then 1            when max(t2.multi_type)  in ('单品','NBA')  and 
t2.store_brd in ('AD','AS','AO','AK','RB') and (sum(t1.sal_qty) > 5 and 
sum(t1.sal_amt) > 3000) then 0                     when max(t2.multi_type) in 
('单品','NBA') and t2.store_brd in ('NK') then 1            when 
max(t2.multi_type) in ('单品','NBA') and t2.store_brd not in 
('NK','AD','AS','AO','AK','RB') and sum(t1.sal_qty) < 10 then 1            when 
max(t2.multi_type) in ('单品','NBA') and t2.store_brd not in 
('NK','AD','AS','AO','AK','RB') and sum(t1.sal_qty) >= 10 then 0            
when max(t2.multi_type)='多品' and sum(t1.sal_qty) < 15 then 1           else 0 
end  upt_flag,          max(case t1.is_online when 1 then 0 else case 
t1.sys_source when 6 then 0 else case t1.virtual_flag when 0 then 0 else 1 end 
end end ) cr_flag,          1 as nos_num_flag,           t2.store_brd, 
t1.org_new_no,           cast(sum(t1.sal_qty) as decimal(38,18)) as  
cust_sal_nos_qty,  cast(sum(case  when pro_cate_flag = 1 then t1.sal_amt else 0 
end) as decimal(38,18)) as cust_sal_nos_amt,  cast(sum(case  when pro_cate_flag 
=1 then t1.sal_nos_prm_amt else 0 end) as decimal(38,18)) as 
cust_sal_nos_prm_amt,  1 as sal_nos_num,  t1.create_time    from       
dws_fact_day_org_pro_size_ord  t1,       DWS_DIM_ORG_ATTR for system_time as of 
t1.proctime as t2,       DWS_DIM_PRO_ALLINFO_CATE_FLAG for system_time as of 
t1.proctime as t3,      (select order_no,order_dtl_id from 
T04_POS_ORDER_ASSISTANT group by order_no,order_dtl_id) t4    where  t1.org_lno 
= t2.org_lno and    t1.pro_no  = t3.pro_no and   t1.order_no = t4.order_no    
and t1.order_dtl_id = t4.order_dtl_id    and t1.table_source=1 and 
t1.order_type = 0    group by          
t2.org_lno,t1.store_lno,t1.period_sdate,t1.order_no,t2.store_brd,t1.org_new_no,t1.create_time
    having        sum(sal_qty)<>0 or sum(case pro_cate_flag when 1 then 
t1.sal_amt else 0 end)<>0;                       
{code}

> ClassCastException when using MAX aggregate function
> ----------------------------------------------------
>
>                 Key: FLINK-20747
>                 URL: https://issues.apache.org/jira/browse/FLINK-20747
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.0
>            Reporter: zengjinbo
>            Priority: Critical
>             Fix For: 1.12.0, 1.13.0
>
>         Attachments: image-2020-12-23-18-04-21-079.png
>
>
> During the process of upgrading 1.12.0, I found that Flink SQL is not 
> compatible with 1.11.1 
> java.lang.ClassCastException: java.lang.Integer cannot be cast to 
> org.apache.flink.table.data.StringDatajava.lang.ClassCastException: 
> java.lang.Integer cannot be cast to org.apache.flink.table.data.StringData at 
> org$apache$flink$table$planner$functions$aggfunctions$MaxWithRetractAggFunction$MaxWithRetractAccumulator$Converter.toInternal(Unknown
>  Source) at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
>  at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
>  at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
>  at GroupAggsHandler$875.getAccumulators(Unknown Source) at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
>  at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>  
>  
> !image-2020-12-23-18-04-21-079.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to