Re:Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread sunfulin
Hi, Benchao,
Thanks for the reply. 


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
I am using Blink Planner. Not test with legacy planner because my program 
depend a lot of new feature based on blink planner.
2. how do you register your UDF?
Just use the code :  tableEnv.registerFunction ("ts2Date", new ts2Date());
tableEnv is a StreamTableEnvironment.
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?
I don't think this is related with checkpoint. If I enable checkpointing and 
not use my udf, I did not see any exception and submit job successfully. If I 
disable checkpointing and use udf, the job can submit successfully too. 


I dive a lot with this exception. Maybe it is related with some classloader 
issue. Hope for your suggestion. 











在 2020-03-01 17:54:03,"Benchao Li"  写道:

Hi fulin,


It seems like a bug in the code generation.


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
2. how do you register your UDF?
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?


sunfulin  于2020年3月1日周日 下午5:41写道:

Hi, guys
I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my 
sql logic, I am using a UDF like ts2Date to handle date format stream fields. 
However, when I add the `env.enableCheckpointing(time)`, my job failed to 
submit and throws exception like following. This is really weird, cause when I 
remove the UDF, the job can submit successfully. Any suggestion is highly 
appreciated. Besides, my sql logic is like : 



INSERT INTO realtime_product_sell
select U.sor_pty_id,
   U.entrust_date,
   U.entrust_time,
   U.product_code,
   U.business_type,
   sum(cast(U.balance as double)) as balance,
   COALESCE(C.cust_name, '--') as cust_name,
   COALESCE(C.open_comp_name, '--') AS open_comp_name,
   COALESCE(C.open_comp_id, '--') as open_comp_id,
   COALESCE(C.org_name,'--') as org_name,
   COALESCE(C.org_id,'--') as comp_name,
   COALESCE(C.comp_name, '--') AS comp_name,
   COALESCE(C.comp_id,'--') as comp_id,
   COALESCE(C.mng_name,'--') as mng_name,
   COALESCE(C.mng_id,'--') as mng_id,
   COALESCE(C.is_tg,'--') as is_tg,
   COALESCE(C.cust_type,'--') as cust_type,
   COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365,
   COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y
from
(select customerNumber as sor_pty_id,
ts2Date(`lastUpdateTime`, true) as entrust_date, -- the UDF 
   ts2Date(`lastUpdateTime`, false) as entrust_time, -- the UDF
fundCode as product_code,
businessType as business_type,
balance,
proctime
  from lscsp_sc_order_all where fundCode in ('007118','007117') and 
businessType in ('5') ) as U
left join
dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C
on U.sor_pty_id = C.cust_id
group by sor_pty_id,
entrust_date,
entrust_time,
product_code,
business_type,
COALESCE(C.cust_name, '--'),
COALESCE(C.open_comp_name, '--'),
COALESCE(C.open_comp_id, '--'),
COALESCE(C.org_name,'--'),
COALESCE(C.org_id,'--'),
COALESCE(C.comp_name, '--'),
COALESCE(C.comp_id,'--'),
COALESCE(C.mng_name,'--'),
COALESCE(C.mng_id,'--'),
COALESCE(C.is_tg,'--'),
COALESCE(C.cust_type,'--'),
COALESCE(C.avg_tot_aset_y365, 0.00),
COALESCE(C.avg_aset_create_y, 0.00)




2020-03-01 17:22:06,504 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
at 
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
at 
org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at 

Re:Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-01 Thread sunfulin
Hi, Benchao,
Thanks for the reply. 


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
I am using Blink Planner. Not test with legacy planner because my program 
depend a lot of new feature based on blink planner.
2. how do you register your UDF?
Just use the code :  tableEnv.registerFunction ("ts2Date", new ts2Date());
tableEnv is a StreamTableEnvironment.
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?
I don't think this is related with checkpoint. If I enable checkpointing and 
not use my udf, I did not see any exception and submit job successfully. If I 
disable checkpointing and use udf, the job can submit successfully too. 


I dive a lot with this exception. Maybe it is related with some classloader 
issue. Hope for your suggestion. 











在 2020-03-01 17:54:03,"Benchao Li"  写道:

Hi fulin,


It seems like a bug in the code generation.


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
2. how do you register your UDF?
3. does this has a relation with checkpointing? what if you enable 
checkpointing and not use your udf? and disable checkpointing and use udf?


sunfulin  于2020年3月1日周日 下午5:41写道:

Hi, guys
I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my 
sql logic, I am using a UDF like ts2Date to handle date format stream fields. 
However, when I add the `env.enableCheckpointing(time)`, my job failed to 
submit and throws exception like following. This is really weird, cause when I 
remove the UDF, the job can submit successfully. Any suggestion is highly 
appreciated. Besides, my sql logic is like : 



INSERT INTO realtime_product_sell
select U.sor_pty_id,
   U.entrust_date,
   U.entrust_time,
   U.product_code,
   U.business_type,
   sum(cast(U.balance as double)) as balance,
   COALESCE(C.cust_name, '--') as cust_name,
   COALESCE(C.open_comp_name, '--') AS open_comp_name,
   COALESCE(C.open_comp_id, '--') as open_comp_id,
   COALESCE(C.org_name,'--') as org_name,
   COALESCE(C.org_id,'--') as comp_name,
   COALESCE(C.comp_name, '--') AS comp_name,
   COALESCE(C.comp_id,'--') as comp_id,
   COALESCE(C.mng_name,'--') as mng_name,
   COALESCE(C.mng_id,'--') as mng_id,
   COALESCE(C.is_tg,'--') as is_tg,
   COALESCE(C.cust_type,'--') as cust_type,
   COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365,
   COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y
from
(select customerNumber as sor_pty_id,
ts2Date(`lastUpdateTime`, true) as entrust_date, -- the UDF 
   ts2Date(`lastUpdateTime`, false) as entrust_time, -- the UDF
fundCode as product_code,
businessType as business_type,
balance,
proctime
  from lscsp_sc_order_all where fundCode in ('007118','007117') and 
businessType in ('5') ) as U
left join
dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C
on U.sor_pty_id = C.cust_id
group by sor_pty_id,
entrust_date,
entrust_time,
product_code,
business_type,
COALESCE(C.cust_name, '--'),
COALESCE(C.open_comp_name, '--'),
COALESCE(C.open_comp_id, '--'),
COALESCE(C.org_name,'--'),
COALESCE(C.org_id,'--'),
COALESCE(C.comp_name, '--'),
COALESCE(C.comp_id,'--'),
COALESCE(C.mng_name,'--'),
COALESCE(C.mng_id,'--'),
COALESCE(C.is_tg,'--'),
COALESCE(C.cust_type,'--'),
COALESCE(C.avg_tot_aset_y365, 0.00),
COALESCE(C.avg_aset_create_y, 0.00)




2020-03-01 17:22:06,504 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
at 
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
at 
org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at