Re: Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 文章 Benchao Li
I just run it in my IDE.

sunfulin  于2020年3月2日周一 下午9:04写道:

>
>
> Hi,
> Yep, I am using 1.10
> Did you submit the job in the cluster or just run it in your IDE? Because
> I can also run it successfully in my IDE, but cannot run it through cluster
> by a shading jar. So I think maybe the problem is related with maven jar
> classpath. But not sure about that.
>
> If you can submit the job by a shade jar through cluster , could you share
> the project pom settings and sample test code ?
>
>
>
>
> At 2020-03-02 20:36:06, "Benchao Li"  wrote:
> >Hi fulin,
> >
> >I cannot reproduce your exception on current master using your SQLs. I
> >searched the error message, it seems that this issue[1] is similar with
> >yours, but it seems that current compile util does not have this issue.
> >
> >BTW, do you using 1.10?
> >
> >[1] https://issues.apache.org/jira/browse/FLINK-7490
> >
> >sunfulin  于2020年3月2日周一 上午11:17写道:
> >
> >>
> >>
> >>
> >> *create table **lscsp_sc_order_all *(
> >>   amount *varchar  *,
> >>   argType *varchar*,
> >>   balance *varchar*,
> >>   branchNo *varchar  *,
> >>   businessType *varchar *,
> >>   channelType *varchar *,
> >>   counterOrderNo *varchar  *,
> >>   counterRegisteredDate *varchar*,
> >>   custAsset *varchar  *,
> >>   customerNumber *varchar*,
> >>   customerType *varchar*,
> >>   discountId *varchar*,
> >>   doubleRecordFlag *varchar*,
> >>   doubleRecordType *varchar*,
> >>   exceedFlag *varchar*,
> >>   fundAccount *varchar*,
> >>   fundCode *varchar*,
> >>   fundCompany *varchar*,
> >>   fundName *varchar*,
> >>   fundRecruitmentFlag *varchar*,
> >>   id *varchar*,
> >>   lastUpdateTime *varchar*,
> >>   opBranchNo *varchar*,
> >>   opStation *varchar*,
> >>   orderNo *varchar*,
> >>   orgEntrustNo *varchar*,
> >>   orgOrderNo *varchar*,
> >>   prodId *varchar*,
> >>   prodInvestorType *varchar*,
> >>   prodLeafType *varchar*,
> >>   prodRisk *varchar*,
> >>   prodRiskFlag *varchar*,
> >>   prodRootType *varchar*,
> >>   prodTerm *varchar*,
> >>   prodVariety *varchar*,
> >>   quaInvestorFlag *varchar*,
> >>   quaInvestorSource *varchar*,
> >>   quickPurchaseFlag *varchar*,
> >>   remark *varchar*,
> >>   remark1 *varchar*,
> >>   remark2 *varchar*,
> >>   remark3 *varchar*,
> >>   riskFlag *varchar*,
> >>   scRcvTime *varchar*,
> >>   scSendTime *varchar*,
> >>   signId *varchar*,
> >>   signSpecialRiskFlag *varchar*,
> >>   source *varchar*,
> >>   *status** varchar*,
> >>   subRiskFlag *varchar*,
> >>   sysNodeId *varchar*,
> >>   taSerialNo *varchar*,
> >>   termFlag *varchar*,
> >>   token *varchar*,
> >>   tradeConfirmDate *varchar*,
> >>   transFundCode *varchar*,
> >>   transProdId *varchar*,
> >>   varietyFlag *varchar*,
> >>   zlcftProdType *varchar*,
> >>   proctime *as *PROCTIME()
> >> *-- 通过计算列产生一个处理时间列*)
> >>
> >> *with*(
> >>   *'connector.type' *= *'kafka'*,
> >> *-- 使用 kafka connector  **'connector.version' *= *'0.10'*,
> >> *-- kafka 版本,universal 支持 0.11 以上的版本  **'connector.topic' *= *''*,
> >>
> >> *-- kafka topic  **'connector.startup-mode' *= *'group-offsets'*,
> >> *-- 从起始 offset 开始读取  **'connector.properties.zookeeper.connect' *=
> >> *''*,
> >> *-- zookeeper 地址  **'connector.properties.bootstrap.servers' *=
> >> *''*,
> >> *-- kafka broker 地址  **'connector.properties.group.id
> >> ' *=
> >> *'acrm-realtime-saleorder-consumer-1'*,
> >>   *'format.type' *= *'json'  *
> >> *-- 数据源格式为 json*)
> >>
> >>
> >> *CREATE TABLE **dim_app_cust_info *(
> >> cust_id *varchar *,
> >> open_comp_name *varchar *,
> >> open_comp_id *varchar *,
> >> org_name *varchar *,
> >> org_id *varchar*,
> >> comp_name *varchar *,
> >> comp_id *varchar *,
> >> mng_name *varchar *,
> >> mng_id *varchar *,
> >> is_tg *varchar *,
> >> cust_name *varchar *,
> >> cust_type *varchar*,
> >> avg_tot_aset_y365 *double *,
> >> avg_aset_create_y
> >> *double*) *WITH *(
> >> *'connector.type' *= *'jdbc'*,
> >> *'connector.url' *= *''*,
> >> *'connector.table' *= *'app_cust_serv_rel_info'*,
> >> *'connector.driver' *= *'com.mysql.jdbc.Driver'*,
> >> *'connector.username' *= *'admin'*,
> >> *'connector.password' *= *'Windows7'*,
> >> *'connector.lookup.cache.max-rows' *= *'8000'*,
> >> *'connector.lookup.cache.ttl' *= *'30min'*,
> >> *'connector.lookup.max-retries' *=
> >> *'3'*)
> >>
> >>
> >>
> >> At 2020-03-02 09:16:05, "Benchao Li"  wrote:
> >> >Could you also provide us the DDL for lscsp_sc_order_all
> >> >and dim_app_cust_info ?
> >> >
> >> >sunfulin  于2020年3月1日周日 下午9:22写道:
> >> >
> >> >>
> >> >> *CREATE TABLE **realtime_product_sell *(
> >> >>   sor_pty_id *varchar*,
> >> >>   entrust_date *varchar*,
> >> >>   entrust_time *varchar*,
> >> >>   product_code *varchar *,
> >> >>   business_type *varchar *,
> >> >>   balance *double *,
> >> >>   cust_name *varchar *,
> >> >>   open_comp_name *varchar *,
> >> >>   open_comp_id *varchar *,
> >> >>   org_name *varchar *,
> >> >>   org_id *varchar *,
> >> >>  

Re:Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 文章 sunfulin




Hi, 
Yep, I am using 1.10
Did you submit the job in the cluster or just run it in your IDE? Because I can 
also run it successfully in my IDE, but cannot run it through cluster by a 
shading jar. So I think maybe the problem is related with maven jar classpath. 
But not sure about that. 


If you can submit the job by a shade jar through cluster , could you share the 
project pom settings and sample test code ?







At 2020-03-02 20:36:06, "Benchao Li"  wrote:
>Hi fulin,
>
>I cannot reproduce your exception on current master using your SQLs. I
>searched the error message, it seems that this issue[1] is similar with
>yours, but it seems that current compile util does not have this issue.
>
>BTW, do you using 1.10?
>
>[1] https://issues.apache.org/jira/browse/FLINK-7490
>
>sunfulin  于2020年3月2日周一 上午11:17写道:
>
>>
>>
>>
>> *create table **lscsp_sc_order_all *(
>>   amount *varchar  *,
>>   argType *varchar*,
>>   balance *varchar*,
>>   branchNo *varchar  *,
>>   businessType *varchar *,
>>   channelType *varchar *,
>>   counterOrderNo *varchar  *,
>>   counterRegisteredDate *varchar*,
>>   custAsset *varchar  *,
>>   customerNumber *varchar*,
>>   customerType *varchar*,
>>   discountId *varchar*,
>>   doubleRecordFlag *varchar*,
>>   doubleRecordType *varchar*,
>>   exceedFlag *varchar*,
>>   fundAccount *varchar*,
>>   fundCode *varchar*,
>>   fundCompany *varchar*,
>>   fundName *varchar*,
>>   fundRecruitmentFlag *varchar*,
>>   id *varchar*,
>>   lastUpdateTime *varchar*,
>>   opBranchNo *varchar*,
>>   opStation *varchar*,
>>   orderNo *varchar*,
>>   orgEntrustNo *varchar*,
>>   orgOrderNo *varchar*,
>>   prodId *varchar*,
>>   prodInvestorType *varchar*,
>>   prodLeafType *varchar*,
>>   prodRisk *varchar*,
>>   prodRiskFlag *varchar*,
>>   prodRootType *varchar*,
>>   prodTerm *varchar*,
>>   prodVariety *varchar*,
>>   quaInvestorFlag *varchar*,
>>   quaInvestorSource *varchar*,
>>   quickPurchaseFlag *varchar*,
>>   remark *varchar*,
>>   remark1 *varchar*,
>>   remark2 *varchar*,
>>   remark3 *varchar*,
>>   riskFlag *varchar*,
>>   scRcvTime *varchar*,
>>   scSendTime *varchar*,
>>   signId *varchar*,
>>   signSpecialRiskFlag *varchar*,
>>   source *varchar*,
>>   *status** varchar*,
>>   subRiskFlag *varchar*,
>>   sysNodeId *varchar*,
>>   taSerialNo *varchar*,
>>   termFlag *varchar*,
>>   token *varchar*,
>>   tradeConfirmDate *varchar*,
>>   transFundCode *varchar*,
>>   transProdId *varchar*,
>>   varietyFlag *varchar*,
>>   zlcftProdType *varchar*,
>>   proctime *as *PROCTIME()
>> *-- 通过计算列产生一个处理时间列*)
>>
>> *with*(
>>   *'connector.type' *= *'kafka'*,
>> *-- 使用 kafka connector  **'connector.version' *= *'0.10'*,
>> *-- kafka 版本,universal 支持 0.11 以上的版本  **'connector.topic' *= *''*,
>>
>> *-- kafka topic  **'connector.startup-mode' *= *'group-offsets'*,
>> *-- 从起始 offset 开始读取  **'connector.properties.zookeeper.connect' *=
>> *''*,
>> *-- zookeeper 地址  **'connector.properties.bootstrap.servers' *=
>> *''*,
>> *-- kafka broker 地址  **'connector.properties.group.id
>> ' *=
>> *'acrm-realtime-saleorder-consumer-1'*,
>>   *'format.type' *= *'json'  *
>> *-- 数据源格式为 json*)
>>
>>
>> *CREATE TABLE **dim_app_cust_info *(
>> cust_id *varchar *,
>> open_comp_name *varchar *,
>> open_comp_id *varchar *,
>> org_name *varchar *,
>> org_id *varchar*,
>> comp_name *varchar *,
>> comp_id *varchar *,
>> mng_name *varchar *,
>> mng_id *varchar *,
>> is_tg *varchar *,
>> cust_name *varchar *,
>> cust_type *varchar*,
>> avg_tot_aset_y365 *double *,
>> avg_aset_create_y
>> *double*) *WITH *(
>> *'connector.type' *= *'jdbc'*,
>> *'connector.url' *= *''*,
>> *'connector.table' *= *'app_cust_serv_rel_info'*,
>> *'connector.driver' *= *'com.mysql.jdbc.Driver'*,
>> *'connector.username' *= *'admin'*,
>> *'connector.password' *= *'Windows7'*,
>> *'connector.lookup.cache.max-rows' *= *'8000'*,
>> *'connector.lookup.cache.ttl' *= *'30min'*,
>> *'connector.lookup.max-retries' *=
>> *'3'*)
>>
>>
>>
>> At 2020-03-02 09:16:05, "Benchao Li"  wrote:
>> >Could you also provide us the DDL for lscsp_sc_order_all
>> >and dim_app_cust_info ?
>> >
>> >sunfulin  于2020年3月1日周日 下午9:22写道:
>> >
>> >>
>> >> *CREATE TABLE **realtime_product_sell *(
>> >>   sor_pty_id *varchar*,
>> >>   entrust_date *varchar*,
>> >>   entrust_time *varchar*,
>> >>   product_code *varchar *,
>> >>   business_type *varchar *,
>> >>   balance *double *,
>> >>   cust_name *varchar *,
>> >>   open_comp_name *varchar *,
>> >>   open_comp_id *varchar *,
>> >>   org_name *varchar *,
>> >>   org_id *varchar *,
>> >>   comp_name *varchar *,
>> >>   comp_id *varchar *,
>> >>   mng_name *varchar *,
>> >>   mng_id *varchar *,
>> >>   is_tg *varchar *,
>> >>   cust_type *varchar *,
>> >>   avg_tot_aset_y365 *double *,
>> >>   avg_aset_create_y
>> >> *double*) *WITH *(
>> >> *'connector.type' *= *'elasticsearch'*,
>> >> *'connector.version' *= *''*,
>> >> *'connector.hosts' *= 

Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 文章 Benchao Li
Hi fulin,

I cannot reproduce your exception on current master using your SQLs. I
searched the error message, it seems that this issue[1] is similar with
yours, but it seems that current compile util does not have this issue.

BTW, do you using 1.10?

[1] https://issues.apache.org/jira/browse/FLINK-7490

sunfulin  于2020年3月2日周一 上午11:17写道:

>
>
>
> *create table **lscsp_sc_order_all *(
>   amount *varchar  *,
>   argType *varchar*,
>   balance *varchar*,
>   branchNo *varchar  *,
>   businessType *varchar *,
>   channelType *varchar *,
>   counterOrderNo *varchar  *,
>   counterRegisteredDate *varchar*,
>   custAsset *varchar  *,
>   customerNumber *varchar*,
>   customerType *varchar*,
>   discountId *varchar*,
>   doubleRecordFlag *varchar*,
>   doubleRecordType *varchar*,
>   exceedFlag *varchar*,
>   fundAccount *varchar*,
>   fundCode *varchar*,
>   fundCompany *varchar*,
>   fundName *varchar*,
>   fundRecruitmentFlag *varchar*,
>   id *varchar*,
>   lastUpdateTime *varchar*,
>   opBranchNo *varchar*,
>   opStation *varchar*,
>   orderNo *varchar*,
>   orgEntrustNo *varchar*,
>   orgOrderNo *varchar*,
>   prodId *varchar*,
>   prodInvestorType *varchar*,
>   prodLeafType *varchar*,
>   prodRisk *varchar*,
>   prodRiskFlag *varchar*,
>   prodRootType *varchar*,
>   prodTerm *varchar*,
>   prodVariety *varchar*,
>   quaInvestorFlag *varchar*,
>   quaInvestorSource *varchar*,
>   quickPurchaseFlag *varchar*,
>   remark *varchar*,
>   remark1 *varchar*,
>   remark2 *varchar*,
>   remark3 *varchar*,
>   riskFlag *varchar*,
>   scRcvTime *varchar*,
>   scSendTime *varchar*,
>   signId *varchar*,
>   signSpecialRiskFlag *varchar*,
>   source *varchar*,
>   *status** varchar*,
>   subRiskFlag *varchar*,
>   sysNodeId *varchar*,
>   taSerialNo *varchar*,
>   termFlag *varchar*,
>   token *varchar*,
>   tradeConfirmDate *varchar*,
>   transFundCode *varchar*,
>   transProdId *varchar*,
>   varietyFlag *varchar*,
>   zlcftProdType *varchar*,
>   proctime *as *PROCTIME()
> *-- 通过计算列产生一个处理时间列*)
>
> *with*(
>   *'connector.type' *= *'kafka'*,
> *-- 使用 kafka connector  **'connector.version' *= *'0.10'*,
> *-- kafka 版本,universal 支持 0.11 以上的版本  **'connector.topic' *= *''*,
>
> *-- kafka topic  **'connector.startup-mode' *= *'group-offsets'*,
> *-- 从起始 offset 开始读取  **'connector.properties.zookeeper.connect' *=
> *''*,
> *-- zookeeper 地址  **'connector.properties.bootstrap.servers' *=
> *''*,
> *-- kafka broker 地址  **'connector.properties.group.id
> ' *=
> *'acrm-realtime-saleorder-consumer-1'*,
>   *'format.type' *= *'json'  *
> *-- 数据源格式为 json*)
>
>
> *CREATE TABLE **dim_app_cust_info *(
> cust_id *varchar *,
> open_comp_name *varchar *,
> open_comp_id *varchar *,
> org_name *varchar *,
> org_id *varchar*,
> comp_name *varchar *,
> comp_id *varchar *,
> mng_name *varchar *,
> mng_id *varchar *,
> is_tg *varchar *,
> cust_name *varchar *,
> cust_type *varchar*,
> avg_tot_aset_y365 *double *,
> avg_aset_create_y
> *double*) *WITH *(
> *'connector.type' *= *'jdbc'*,
> *'connector.url' *= *''*,
> *'connector.table' *= *'app_cust_serv_rel_info'*,
> *'connector.driver' *= *'com.mysql.jdbc.Driver'*,
> *'connector.username' *= *'admin'*,
> *'connector.password' *= *'Windows7'*,
> *'connector.lookup.cache.max-rows' *= *'8000'*,
> *'connector.lookup.cache.ttl' *= *'30min'*,
> *'connector.lookup.max-retries' *=
> *'3'*)
>
>
>
> At 2020-03-02 09:16:05, "Benchao Li"  wrote:
> >Could you also provide us the DDL for lscsp_sc_order_all
> >and dim_app_cust_info ?
> >
> >sunfulin  于2020年3月1日周日 下午9:22写道:
> >
> >>
> >> *CREATE TABLE **realtime_product_sell *(
> >>   sor_pty_id *varchar*,
> >>   entrust_date *varchar*,
> >>   entrust_time *varchar*,
> >>   product_code *varchar *,
> >>   business_type *varchar *,
> >>   balance *double *,
> >>   cust_name *varchar *,
> >>   open_comp_name *varchar *,
> >>   open_comp_id *varchar *,
> >>   org_name *varchar *,
> >>   org_id *varchar *,
> >>   comp_name *varchar *,
> >>   comp_id *varchar *,
> >>   mng_name *varchar *,
> >>   mng_id *varchar *,
> >>   is_tg *varchar *,
> >>   cust_type *varchar *,
> >>   avg_tot_aset_y365 *double *,
> >>   avg_aset_create_y
> >> *double*) *WITH *(
> >> *'connector.type' *= *'elasticsearch'*,
> >> *'connector.version' *= *''*,
> >> *'connector.hosts' *= *''*,
> >> *'connector.index' *= *'realtime_product_sell_007118'*,
> >> *'connector.document-type' *= *'_doc'*,
> >> *'update-mode' *= *'upsert'*,
> >> *'connector.key-delimiter' *= *'$'*,
> >> *'connector.key-null-literal' *= *'n/a'*,
> >> *'connector.bulk-flush.interval' *= *'1000'*,
> >> *'format.type' *=
> >> *'json'*)
> >>
> >>
> >>
> >>
> >>
> >> At 2020-03-01 21:08:08, "Benchao Li"  wrote:
> >> >The UDF looks good. Could you also paste your DDL? Then we can produce 
> >> >your
> >> >bug easily.
> >> >
> >> >sunfulin  于2020年3月1日周日 下午6:39写道:
> >> >
> >> >> Below is the code. The function trans origin field timeStr

Re:Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 文章 sunfulin







create table lscsp_sc_order_all (
  amount varchar  ,
  argType varchar,
  balance varchar,
  branchNo varchar  ,
  businessType varchar ,
  channelType varchar ,
  counterOrderNo varchar  ,
  counterRegisteredDate varchar,
  custAsset varchar  ,
  customerNumber varchar,
  customerType varchar,
  discountId varchar,
  doubleRecordFlag varchar,
  doubleRecordType varchar,
  exceedFlag varchar,
  fundAccount varchar,
  fundCode varchar,
  fundCompany varchar,
  fundName varchar,
  fundRecruitmentFlag varchar,
  id varchar,
  lastUpdateTime varchar,
  opBranchNo varchar,
  opStation varchar,
  orderNo varchar,
  orgEntrustNo varchar,
  orgOrderNo varchar,
  prodId varchar,
  prodInvestorType varchar,
  prodLeafType varchar,
  prodRisk varchar,
  prodRiskFlag varchar,
  prodRootType varchar,
  prodTerm varchar,
  prodVariety varchar,
  quaInvestorFlag varchar,
  quaInvestorSource varchar,
  quickPurchaseFlag varchar,
  remark varchar,
  remark1 varchar,
  remark2 varchar,
  remark3 varchar,
  riskFlag varchar,
  scRcvTime varchar,
  scSendTime varchar,
  signId varchar,
  signSpecialRiskFlag varchar,
  source varchar,
  status varchar,
  subRiskFlag varchar,
  sysNodeId varchar,
  taSerialNo varchar,
  termFlag varchar,
  token varchar,
  tradeConfirmDate varchar,
  transFundCode varchar,
  transProdId varchar,
  varietyFlag varchar,
  zlcftProdType varchar,
  proctime as PROCTIME()   -- 通过计算列产生一个处理时间列
)
with
(
  'connector.type' = 'kafka',  -- 使用 kafka connector
  'connector.version' = '0.10',  -- kafka 版本,universal 支持 0.11 以上的版本
  'connector.topic' = '',  -- kafka topic
  'connector.startup-mode' = 'group-offsets',  -- 从起始 offset 开始读取
  'connector.properties.zookeeper.connect' = '',  -- zookeeper 地址
  'connector.properties.bootstrap.servers' = '',  -- kafka 
broker 地址
  'connector.properties.group.id' = 'acrm-realtime-saleorder-consumer-1',
  'format.type' = 'json'  -- 数据源格式为 json
)







CREATE TABLE dim_app_cust_info (
cust_id varchar ,
open_comp_name varchar ,
open_comp_id varchar ,
org_name varchar ,
org_id varchar,
comp_name varchar ,
comp_id varchar ,
mng_name varchar ,
mng_id varchar ,
is_tg varchar ,
cust_name varchar ,
cust_type varchar,
avg_tot_aset_y365 double ,
avg_aset_create_y double
) WITH (
'connector.type' = 'jdbc',
'connector.url' = '',
'connector.table' = 'app_cust_serv_rel_info',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'admin',
'connector.password' = 'Windows7',
'connector.lookup.cache.max-rows' = '8000',
'connector.lookup.cache.ttl' = '30min',
'connector.lookup.max-retries' = '3'
)








At 2020-03-02 09:16:05, "Benchao Li"  wrote:
>Could you also provide us the DDL for lscsp_sc_order_all
>and dim_app_cust_info ?
>
>sunfulin  于2020年3月1日周日 下午9:22写道:
>
>>
>> *CREATE TABLE **realtime_product_sell *(
>>   sor_pty_id *varchar*,
>>   entrust_date *varchar*,
>>   entrust_time *varchar*,
>>   product_code *varchar *,
>>   business_type *varchar *,
>>   balance *double *,
>>   cust_name *varchar *,
>>   open_comp_name *varchar *,
>>   open_comp_id *varchar *,
>>   org_name *varchar *,
>>   org_id *varchar *,
>>   comp_name *varchar *,
>>   comp_id *varchar *,
>>   mng_name *varchar *,
>>   mng_id *varchar *,
>>   is_tg *varchar *,
>>   cust_type *varchar *,
>>   avg_tot_aset_y365 *double *,
>>   avg_aset_create_y
>> *double*) *WITH *(
>> *'connector.type' *= *'elasticsearch'*,
>> *'connector.version' *= *''*,
>> *'connector.hosts' *= *''*,
>> *'connector.index' *= *'realtime_product_sell_007118'*,
>> *'connector.document-type' *= *'_doc'*,
>> *'update-mode' *= *'upsert'*,
>> *'connector.key-delimiter' *= *'$'*,
>> *'connector.key-null-literal' *= *'n/a'*,
>> *'connector.bulk-flush.interval' *= *'1000'*,
>> *'format.type' *=
>> *'json'*)
>>
>>
>>
>>
>>
>> At 2020-03-01 21:08:08, "Benchao Li"  wrote:
>> >The UDF looks good. Could you also paste your DDL? Then we can produce your
>> >bug easily.
>> >
>> >sunfulin  于2020年3月1日周日 下午6:39写道:
>> >
>> >> Below is the code. The function trans origin field timeStr "2020-03-01
>> >> 12:01:00.234" to target timeStr accroding to dayTag.
>> >>
>> >> *public class *ts2Date *extends *ScalarFunction {
>> >> *public *ts2Date() {
>> >>
>> >> }
>> >>
>> >>
>> >> *public *String eval (String timeStr, *boolean *dayTag) {
>> >>
>> >> *if*(timeStr == *null*) {
>> >> *return null*;
>> >> }
>> >> SimpleDateFormat ortSf = *new *SimpleDateFormat(*"-MM-dd
>> >> HH:mm:ss.SSS"*);
>> >> Date date = *new *Date();
>> >> *try *{
>> >> date = ortSf.parse(timeStr);
>> >> } *catch *(ParseException e) {
>> >> e.printStackTrace();
>> >> *return null*;
>> >> }
>> >> *if *(dayTag) {
>> >> String format = *"-MM-dd"*;
>> >> SimpleDateFormat sf = *new *SimpleDateFormat(format);
>> >> *return *sf.format(date);
>> >> } *else *{
>> >> String format = *"-MM-dd**\'**T**\'**HH:mm:00

Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 文章 Benchao Li
Could you also provide us the DDL for lscsp_sc_order_all
and dim_app_cust_info ?

sunfulin  于2020年3月1日周日 下午9:22写道:

>
> *CREATE TABLE **realtime_product_sell *(
>   sor_pty_id *varchar*,
>   entrust_date *varchar*,
>   entrust_time *varchar*,
>   product_code *varchar *,
>   business_type *varchar *,
>   balance *double *,
>   cust_name *varchar *,
>   open_comp_name *varchar *,
>   open_comp_id *varchar *,
>   org_name *varchar *,
>   org_id *varchar *,
>   comp_name *varchar *,
>   comp_id *varchar *,
>   mng_name *varchar *,
>   mng_id *varchar *,
>   is_tg *varchar *,
>   cust_type *varchar *,
>   avg_tot_aset_y365 *double *,
>   avg_aset_create_y
> *double*) *WITH *(
> *'connector.type' *= *'elasticsearch'*,
> *'connector.version' *= *''*,
> *'connector.hosts' *= *''*,
> *'connector.index' *= *'realtime_product_sell_007118'*,
> *'connector.document-type' *= *'_doc'*,
> *'update-mode' *= *'upsert'*,
> *'connector.key-delimiter' *= *'$'*,
> *'connector.key-null-literal' *= *'n/a'*,
> *'connector.bulk-flush.interval' *= *'1000'*,
> *'format.type' *=
> *'json'*)
>
>
>
>
>
> At 2020-03-01 21:08:08, "Benchao Li"  wrote:
> >The UDF looks good. Could you also paste your DDL? Then we can produce your
> >bug easily.
> >
> >sunfulin  于2020年3月1日周日 下午6:39写道:
> >
> >> Below is the code. The function trans origin field timeStr "2020-03-01
> >> 12:01:00.234" to target timeStr accroding to dayTag.
> >>
> >> *public class *ts2Date *extends *ScalarFunction {
> >> *public *ts2Date() {
> >>
> >> }
> >>
> >>
> >> *public *String eval (String timeStr, *boolean *dayTag) {
> >>
> >> *if*(timeStr == *null*) {
> >> *return null*;
> >> }
> >> SimpleDateFormat ortSf = *new *SimpleDateFormat(*"-MM-dd
> >> HH:mm:ss.SSS"*);
> >> Date date = *new *Date();
> >> *try *{
> >> date = ortSf.parse(timeStr);
> >> } *catch *(ParseException e) {
> >> e.printStackTrace();
> >> *return null*;
> >> }
> >> *if *(dayTag) {
> >> String format = *"-MM-dd"*;
> >> SimpleDateFormat sf = *new *SimpleDateFormat(format);
> >> *return *sf.format(date);
> >> } *else *{
> >> String format = *"-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
> >> SimpleDateFormat sf = *new *SimpleDateFormat(format);
> >> *return *sf.format(date);
> >> }
> >> }
> >> }
> >>
> >>
> >>
> >> At 2020-03-01 18:14:30, "Benchao Li"  wrote:
> >>
> >> Could you show how your UDF `ts2Date` is implemented?
> >>
> >> sunfulin  于2020年3月1日周日 下午6:05写道:
> >>
> >>> Hi, Benchao,
> >>> Thanks for the reply.
> >>>
> >>> Could you provide us more information?
> >>> 1. what planner are you using? blink or legacy planner?
> >>> I am using Blink Planner. Not test with legacy planner because my program
> >>> depend a lot of new feature based on blink planner.
> >>> 2. how do you register your UDF?
> >>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
> >>> ts2Date());tableEnv is a StreamTableEnvironment.
> >>> 3. does this has a relation with checkpointing? what if you enable
> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
> >>> I don't think this is related with checkpoint. If I enable checkpointing
> >>> and not use my udf, I did not see any exception and submit job
> >>> successfully. If I disable checkpointing and use udf, the job can submit
> >>> successfully too.
> >>>
> >>> I dive a lot with this exception. Maybe it is related with some
> >>> classloader issue. Hope for your suggestion.
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> 在 2020-03-01 17:54:03,"Benchao Li"  写道:
> >>>
> >>> Hi fulin,
> >>>
> >>> It seems like a bug in the code generation.
> >>>
> >>> Could you provide us more information?
> >>> 1. what planner are you using? blink or legacy planner?
> >>> 2. how do you register your UDF?
> >>> 3. does this has a relation with checkpointing? what if you enable
> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
> >>>
> >>> sunfulin  于2020年3月1日周日 下午5:41写道:
> >>>
>  Hi, guys
>  I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
>  In my sql logic, I am using a UDF like ts2Date to handle date format 
>  stream
>  fields. However, when I add the `env.enableCheckpointing(time)`, my job
>  failed to submit and throws exception like following. This is really 
>  weird,
>  cause when I remove the UDF, the job can submit successfully. Any
>  suggestion is highly appreciated. Besides, my sql logic is like :
> 
>  *INSERT INTO *realtime_product_sell
>  *select *U.sor_pty_id,
> U.entrust_date,
> U.entrust_time,
> U.product_code,
> U.business_type,
> sum(*cast*(U.balance *as double*)) *as *balance,
> COALESCE(C.cust_name, *'--'*) *as *cust_name,
> COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
> COALESCE(C.open_comp_id, *'--'*)

Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 文章 Benchao Li
The UDF looks good. Could you also paste your DDL? Then we can produce your
bug easily.

sunfulin  于2020年3月1日周日 下午6:39写道:

> Below is the code. The function trans origin field timeStr "2020-03-01
> 12:01:00.234" to target timeStr accroding to dayTag.
>
> *public class *ts2Date *extends *ScalarFunction {
> *public *ts2Date() {
>
> }
>
>
> *public *String eval (String timeStr, *boolean *dayTag) {
>
> *if*(timeStr == *null*) {
> *return null*;
> }
> SimpleDateFormat ortSf = *new *SimpleDateFormat(*"-MM-dd
> HH:mm:ss.SSS"*);
> Date date = *new *Date();
> *try *{
> date = ortSf.parse(timeStr);
> } *catch *(ParseException e) {
> e.printStackTrace();
> *return null*;
> }
> *if *(dayTag) {
> String format = *"-MM-dd"*;
> SimpleDateFormat sf = *new *SimpleDateFormat(format);
> *return *sf.format(date);
> } *else *{
> String format = *"-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
> SimpleDateFormat sf = *new *SimpleDateFormat(format);
> *return *sf.format(date);
> }
> }
> }
>
>
>
> At 2020-03-01 18:14:30, "Benchao Li"  wrote:
>
> Could you show how your UDF `ts2Date` is implemented?
>
> sunfulin  于2020年3月1日周日 下午6:05写道:
>
>> Hi, Benchao,
>> Thanks for the reply.
>>
>> Could you provide us more information?
>> 1. what planner are you using? blink or legacy planner?
>> I am using Blink Planner. Not test with legacy planner because my program
>> depend a lot of new feature based on blink planner.
>> 2. how do you register your UDF?
>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
>> ts2Date());tableEnv is a StreamTableEnvironment.
>> 3. does this has a relation with checkpointing? what if you enable
>> checkpointing and not use your udf? and disable checkpointing and use udf?
>> I don't think this is related with checkpoint. If I enable checkpointing
>> and not use my udf, I did not see any exception and submit job
>> successfully. If I disable checkpointing and use udf, the job can submit
>> successfully too.
>>
>> I dive a lot with this exception. Maybe it is related with some
>> classloader issue. Hope for your suggestion.
>>
>>
>>
>>
>>
>> 在 2020-03-01 17:54:03,"Benchao Li"  写道:
>>
>> Hi fulin,
>>
>> It seems like a bug in the code generation.
>>
>> Could you provide us more information?
>> 1. what planner are you using? blink or legacy planner?
>> 2. how do you register your UDF?
>> 3. does this has a relation with checkpointing? what if you enable
>> checkpointing and not use your udf? and disable checkpointing and use udf?
>>
>> sunfulin  于2020年3月1日周日 下午5:41写道:
>>
>>> Hi, guys
>>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
>>> In my sql logic, I am using a UDF like ts2Date to handle date format stream
>>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
>>> failed to submit and throws exception like following. This is really weird,
>>> cause when I remove the UDF, the job can submit successfully. Any
>>> suggestion is highly appreciated. Besides, my sql logic is like :
>>>
>>> *INSERT INTO *realtime_product_sell
>>> *select *U.sor_pty_id,
>>>U.entrust_date,
>>>U.entrust_time,
>>>U.product_code,
>>>U.business_type,
>>>sum(*cast*(U.balance *as double*)) *as *balance,
>>>COALESCE(C.cust_name, *'--'*) *as *cust_name,
>>>COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
>>>COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
>>>COALESCE(C.org_name,*'--'*) *as *org_name,
>>>COALESCE(C.org_id,*'--'*) *as *comp_name,
>>>COALESCE(C.comp_name, *'--'*) *AS *comp_name,
>>>COALESCE(C.comp_id,*'--'*) *as *comp_id,
>>>COALESCE(C.mng_name,*'--'*) *as *mng_name,
>>>COALESCE(C.mng_id,*'--'*) *as *mng_id,
>>>COALESCE(C.is_tg,*'--'*) *as *is_tg,
>>>COALESCE(C.cust_type,*'--'*) *as *cust_type,
>>>COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
>>>COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
>>>
>>> *from*(*select *customerNumber *as *sor_pty_id,
>>> ts2Date(`lastUpdateTime`, *true*) *as *entrust_date, -- the
>>> UDF
>>>ts2Date(`lastUpdateTime`, *false*) *as *entrust_time, -- the
>>> UDF
>>> fundCode *as *product_code,
>>> businessType *as *business_type,
>>> balance,
>>> proctime
>>>   *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
>>> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
>>>
>>> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS
>>> *C
>>> *on **U*.sor_pty_id = *C*.cust_id
>>> *group by *sor_pty_id,
>>> entrust_date,
>>> entrust_time,
>>> product_code,
>>> business_type,
>>> COALESCE(C.cust_name, *'--'*),
>>> COALESCE(C.open_comp_name, *'--'*),
>>> COALESCE(C.open_comp_id, *'--'*),
>>>