Re:Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 Thread sunfulin




Hi, 
Yep, I am using 1.10
Did you submit the job in the cluster or just run it in your IDE? Because I can 
also run it successfully in my IDE, but cannot run it through cluster by a 
shading jar. So I think maybe the problem is related with maven jar classpath. 
But not sure about that. 


If you can submit the job by a shade jar through cluster , could you share the 
project pom settings and sample test code ?







At 2020-03-02 20:36:06, "Benchao Li"  wrote:
>Hi fulin,
>
>I cannot reproduce your exception on current master using your SQLs. I
>searched the error message, it seems that this issue[1] is similar with
>yours, but it seems that current compile util does not have this issue.
>
>BTW, do you using 1.10?
>
>[1] https://issues.apache.org/jira/browse/FLINK-7490
>
>sunfulin  于2020年3月2日周一 上午11:17写道:
>
>>
>>
>>
>> *create table **lscsp_sc_order_all *(
>>   amount *varchar  *,
>>   argType *varchar*,
>>   balance *varchar*,
>>   branchNo *varchar  *,
>>   businessType *varchar *,
>>   channelType *varchar *,
>>   counterOrderNo *varchar  *,
>>   counterRegisteredDate *varchar*,
>>   custAsset *varchar  *,
>>   customerNumber *varchar*,
>>   customerType *varchar*,
>>   discountId *varchar*,
>>   doubleRecordFlag *varchar*,
>>   doubleRecordType *varchar*,
>>   exceedFlag *varchar*,
>>   fundAccount *varchar*,
>>   fundCode *varchar*,
>>   fundCompany *varchar*,
>>   fundName *varchar*,
>>   fundRecruitmentFlag *varchar*,
>>   id *varchar*,
>>   lastUpdateTime *varchar*,
>>   opBranchNo *varchar*,
>>   opStation *varchar*,
>>   orderNo *varchar*,
>>   orgEntrustNo *varchar*,
>>   orgOrderNo *varchar*,
>>   prodId *varchar*,
>>   prodInvestorType *varchar*,
>>   prodLeafType *varchar*,
>>   prodRisk *varchar*,
>>   prodRiskFlag *varchar*,
>>   prodRootType *varchar*,
>>   prodTerm *varchar*,
>>   prodVariety *varchar*,
>>   quaInvestorFlag *varchar*,
>>   quaInvestorSource *varchar*,
>>   quickPurchaseFlag *varchar*,
>>   remark *varchar*,
>>   remark1 *varchar*,
>>   remark2 *varchar*,
>>   remark3 *varchar*,
>>   riskFlag *varchar*,
>>   scRcvTime *varchar*,
>>   scSendTime *varchar*,
>>   signId *varchar*,
>>   signSpecialRiskFlag *varchar*,
>>   source *varchar*,
>>   *status** varchar*,
>>   subRiskFlag *varchar*,
>>   sysNodeId *varchar*,
>>   taSerialNo *varchar*,
>>   termFlag *varchar*,
>>   token *varchar*,
>>   tradeConfirmDate *varchar*,
>>   transFundCode *varchar*,
>>   transProdId *varchar*,
>>   varietyFlag *varchar*,
>>   zlcftProdType *varchar*,
>>   proctime *as *PROCTIME()
>> *-- 通过计算列产生一个处理时间列*)
>>
>> *with*(
>>   *'connector.type' *= *'kafka'*,
>> *-- 使用 kafka connector  **'connector.version' *= *'0.10'*,
>> *-- kafka 版本,universal 支持 0.11 以上的版本  **'connector.topic' *= *''*,
>>
>> *-- kafka topic  **'connector.startup-mode' *= *'group-offsets'*,
>> *-- 从起始 offset 开始读取  **'connector.properties.zookeeper.connect' *=
>> *''*,
>> *-- zookeeper 地址  **'connector.properties.bootstrap.servers' *=
>> *''*,
>> *-- kafka broker 地址  **'connector.properties.group.id
>> ' *=
>> *'acrm-realtime-saleorder-consumer-1'*,
>>   *'format.type' *= *'json'  *
>> *-- 数据源格式为 json*)
>>
>>
>> *CREATE TABLE **dim_app_cust_info *(
>> cust_id *varchar *,
>> open_comp_name *varchar *,
>> open_comp_id *varchar *,
>> org_name *varchar *,
>> org_id *varchar*,
>> comp_name *varchar *,
>> comp_id *varchar *,
>> mng_name *varchar *,
>> mng_id *varchar *,
>> is_tg *varchar *,
>> cust_name *varchar *,
>> cust_type *varchar*,
>> avg_tot_aset_y365 *double *,
>> avg_aset_create_y
>> *double*) *WITH *(
>> *'connector.type' *= *'jdbc'*,
>> *'connector.url' *= *''*,
>> *'connector.table' *= *'app_cust_serv_rel_info'*,
>> *'connector.driver' *= *'com.mysql.jdbc.Driver'*,
>> *'connector.username' *= *'admin'*,
>> *'connector.password' *= *'Windows7'*,
>> *'connector.lookup.cache.max-rows' *= *'8000'*,
>> *'connector.lookup.cache.ttl' *= *'30min'*,
>> *'connector.lookup.max-retries' *=
>> *'3'*)
>>
>>
>>
>> At 2020-03-02 09:16:05, "Benchao Li"  wrote:
>> >Could you also provide us the DDL for lscsp_sc_order_all
>> >and dim_app_cust_info ?
>> >
>> >sunfulin  于2020年3月1日周日 下午9:22写道:
>> >
>> >>
>> >> *CREATE TABLE **realtime_product_sell *(
>> >>   sor_pty_id *varchar*,
>> >>   entrust_date *varchar*,
>> >>   entrust_time *varchar*,
>> >>   product_code *varchar *,
>> >>   business_type *varchar *,
>> >>   balance *double *,
>> >>   cust_name *varchar *,
>> >>   open_comp_name *varchar *,
>> >>   open_comp_id *varchar *,
>> >>   org_name *varchar *,
>> >>   org_id *varchar *,
>> >>   comp_name *varchar *,
>> >>   comp_id *varchar *,
>> >>   mng_name *varchar *,
>> >>   mng_id *varchar *,
>> >>   is_tg *varchar *,
>> >>   cust_type *varchar *,
>> >>   avg_tot_aset_y365 *double *,
>> >>   avg_aset_create_y
>> >> *double*) *WITH *(
>> >> *'connector.type' *= *'elasticsearch'*,
>> >> *'connector.version' *= *''*,
>> >> *'connector.hosts' *= 

Re:Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 Thread sunfulin




Hi, 
Yep, I am using 1.10
Did you submit the job in the cluster or just run it in your IDE? Because I can 
also run it successfully in my IDE, but cannot run it through cluster by a 
shading jar. So I think maybe the problem is related with maven jar classpath. 
But not sure about that. 


If you can submit the job by a shade jar through cluster , could you share the 
project pom settings and sample test code ?







At 2020-03-02 20:36:06, "Benchao Li"  wrote:
>Hi fulin,
>
>I cannot reproduce your exception on current master using your SQLs. I
>searched the error message, it seems that this issue[1] is similar with
>yours, but it seems that current compile util does not have this issue.
>
>BTW, do you using 1.10?
>
>[1] https://issues.apache.org/jira/browse/FLINK-7490
>
>sunfulin  于2020年3月2日周一 上午11:17写道:
>
>>
>>
>>
>> *create table **lscsp_sc_order_all *(
>>   amount *varchar  *,
>>   argType *varchar*,
>>   balance *varchar*,
>>   branchNo *varchar  *,
>>   businessType *varchar *,
>>   channelType *varchar *,
>>   counterOrderNo *varchar  *,
>>   counterRegisteredDate *varchar*,
>>   custAsset *varchar  *,
>>   customerNumber *varchar*,
>>   customerType *varchar*,
>>   discountId *varchar*,
>>   doubleRecordFlag *varchar*,
>>   doubleRecordType *varchar*,
>>   exceedFlag *varchar*,
>>   fundAccount *varchar*,
>>   fundCode *varchar*,
>>   fundCompany *varchar*,
>>   fundName *varchar*,
>>   fundRecruitmentFlag *varchar*,
>>   id *varchar*,
>>   lastUpdateTime *varchar*,
>>   opBranchNo *varchar*,
>>   opStation *varchar*,
>>   orderNo *varchar*,
>>   orgEntrustNo *varchar*,
>>   orgOrderNo *varchar*,
>>   prodId *varchar*,
>>   prodInvestorType *varchar*,
>>   prodLeafType *varchar*,
>>   prodRisk *varchar*,
>>   prodRiskFlag *varchar*,
>>   prodRootType *varchar*,
>>   prodTerm *varchar*,
>>   prodVariety *varchar*,
>>   quaInvestorFlag *varchar*,
>>   quaInvestorSource *varchar*,
>>   quickPurchaseFlag *varchar*,
>>   remark *varchar*,
>>   remark1 *varchar*,
>>   remark2 *varchar*,
>>   remark3 *varchar*,
>>   riskFlag *varchar*,
>>   scRcvTime *varchar*,
>>   scSendTime *varchar*,
>>   signId *varchar*,
>>   signSpecialRiskFlag *varchar*,
>>   source *varchar*,
>>   *status** varchar*,
>>   subRiskFlag *varchar*,
>>   sysNodeId *varchar*,
>>   taSerialNo *varchar*,
>>   termFlag *varchar*,
>>   token *varchar*,
>>   tradeConfirmDate *varchar*,
>>   transFundCode *varchar*,
>>   transProdId *varchar*,
>>   varietyFlag *varchar*,
>>   zlcftProdType *varchar*,
>>   proctime *as *PROCTIME()
>> *-- 通过计算列产生一个处理时间列*)
>>
>> *with*(
>>   *'connector.type' *= *'kafka'*,
>> *-- 使用 kafka connector  **'connector.version' *= *'0.10'*,
>> *-- kafka 版本,universal 支持 0.11 以上的版本  **'connector.topic' *= *''*,
>>
>> *-- kafka topic  **'connector.startup-mode' *= *'group-offsets'*,
>> *-- 从起始 offset 开始读取  **'connector.properties.zookeeper.connect' *=
>> *''*,
>> *-- zookeeper 地址  **'connector.properties.bootstrap.servers' *=
>> *''*,
>> *-- kafka broker 地址  **'connector.properties.group.id
>> ' *=
>> *'acrm-realtime-saleorder-consumer-1'*,
>>   *'format.type' *= *'json'  *
>> *-- 数据源格式为 json*)
>>
>>
>> *CREATE TABLE **dim_app_cust_info *(
>> cust_id *varchar *,
>> open_comp_name *varchar *,
>> open_comp_id *varchar *,
>> org_name *varchar *,
>> org_id *varchar*,
>> comp_name *varchar *,
>> comp_id *varchar *,
>> mng_name *varchar *,
>> mng_id *varchar *,
>> is_tg *varchar *,
>> cust_name *varchar *,
>> cust_type *varchar*,
>> avg_tot_aset_y365 *double *,
>> avg_aset_create_y
>> *double*) *WITH *(
>> *'connector.type' *= *'jdbc'*,
>> *'connector.url' *= *''*,
>> *'connector.table' *= *'app_cust_serv_rel_info'*,
>> *'connector.driver' *= *'com.mysql.jdbc.Driver'*,
>> *'connector.username' *= *'admin'*,
>> *'connector.password' *= *'Windows7'*,
>> *'connector.lookup.cache.max-rows' *= *'8000'*,
>> *'connector.lookup.cache.ttl' *= *'30min'*,
>> *'connector.lookup.max-retries' *=
>> *'3'*)
>>
>>
>>
>> At 2020-03-02 09:16:05, "Benchao Li"  wrote:
>> >Could you also provide us the DDL for lscsp_sc_order_all
>> >and dim_app_cust_info ?
>> >
>> >sunfulin  于2020年3月1日周日 下午9:22写道:
>> >
>> >>
>> >> *CREATE TABLE **realtime_product_sell *(
>> >>   sor_pty_id *varchar*,
>> >>   entrust_date *varchar*,
>> >>   entrust_time *varchar*,
>> >>   product_code *varchar *,
>> >>   business_type *varchar *,
>> >>   balance *double *,
>> >>   cust_name *varchar *,
>> >>   open_comp_name *varchar *,
>> >>   open_comp_id *varchar *,
>> >>   org_name *varchar *,
>> >>   org_id *varchar *,
>> >>   comp_name *varchar *,
>> >>   comp_id *varchar *,
>> >>   mng_name *varchar *,
>> >>   mng_id *varchar *,
>> >>   is_tg *varchar *,
>> >>   cust_type *varchar *,
>> >>   avg_tot_aset_y365 *double *,
>> >>   avg_aset_create_y
>> >> *double*) *WITH *(
>> >> *'connector.type' *= *'elasticsearch'*,
>> >> *'connector.version' *= *''*,
>> >> *'connector.hosts' *=