我看到这个 sql 是对维表的join on字段进行 cast,想问一下能否对主表(X_NEWS_TCRNW0003_1_ALL_CDC)的字段进行 cast 试试呢?
Shengkai Alibaba yinghua...@163.com <yinghua...@163.com> 于2022年10月24日周一 09:27写道: > 出错时SQL如下: > create table X_NEWS_TCRNW0003_1_ALL_CDC > ( > ID bigint, > NewsCode bigint, > NewsDate timestamp, > ITCode2 string, > CompanyCode string, > CRNW0003_001 int, > CRNW0003_002 int, > CRNW0003_003 int, > CRNW0003_004 string, > CRNW0003_005 decimal, > CRNW0003_006 decimal, > CRNW0003_007 string, > CRNW0003_010 int, > ProStatus tinyint, > Flag tinyint, > TMStamp string, > FWDT timestamp, > EntryDT timestamp, > ENTRYDATE timestamp, > ENTRYTIME string, > LASTUPDT timestamp, > ITName string, > pt AS PROCTIME(), > PRIMARY KEY(ID, CRNW0003_001) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = '10.88.200.XX', > 'port' = '3306', > 'username' = 'root', > 'password' = 'XXX', > 'database-name'='dzhdb', > 'table-name'='X_NEWS_TCRNW0003_1_ALL', > 'debezium.query.fetch.size'= '1024', > 'scan.incremental.snapshot.chunk.size'= '1024', > 'debezium.min.row.count.to.stream.result' = '500', > 'debezium.snapshot.delay.ms' = '1000', > 'debezium.snapshot.fetch.size'='1024', > 'scan.startup.mode'='latest-offset' > ); > > > CREATE TABLE X_NEWS_TCRNW0001_STATIC( > ID bigint, > NewsCode bigint, > CRNW0001_001 int, > CRNW0001_002 timestamp, > CRNW0001_003 varchar, > CRNW0001_007 varchar, > Flag tinyint, > EntryDT timestamp, > LASTUPDT timestamp, > PRIMARY KEY (ID) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = > 'jdbc:mysql://10.88.200.XX:3306/dzhdb?characterEncoding=utf8&autoReconnect=true&useSSL=false', > 'table-name' = 'X_NEWS_TCRNW0001', > 'username' = 'root', > 'driver' = 'com.mysql.jdbc.Driver', > 'password' = 'XXX', > 'lookup.cache.max-rows' = '1000', > 'lookup.cache.ttl' = '60s' > ); > > CREATE TABLE X_NEWS_TCRNWITCODE_STATIC( > ID bigint, > ITCode2 varchar, > ITCode varchar, > ITName varchar, > ITAName varchar, > CR0001_007 varchar, > CR0001_008 varchar, > CR0001_031 varchar, > Flag tinyint, > PRIMARY KEY (ID) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = > 'jdbc:mysql://10.88.200.XX:3306/dzhdb?characterEncoding=utf8&autoReconnect=true&useSSL=false', > 'table-name' = 'X_NEWS_TCRNWITCODE', > 'username' = 'root', > 'driver' = 'com.mysql.jdbc.Driver', > 'password' = 'XXX', > 'lookup.cache.max-rows' = '1000', > 'lookup.cache.ttl' = '60s' > ); > > CREATE TABLE X_NEWS_INDEX_TREE_STATIC( > ID bigint, > INDEXCODE varchar, > INDEXNAME varchar, > INDEX_F_CODE varchar, > NOWSTATUS int, > Flag int, > PRIMARY KEY(ID, INDEXCODE, INDEX_F_CODE) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = > 'jdbc:mysql://10.88.200.XX:3306/dzhdb?characterEncoding=utf8&autoReconnect=true&useSSL=false', > 'table-name' = 'X_NEWS_INDEX_TREE', > 'username' = 'root', > 'driver' = 'com.mysql.jdbc.Driver', > 'password' = 'XXX', > 'lookup.cache.max-rows' = '1000', > 'lookup.cache.ttl' = '60s' > ); > > > create table news_org_pntt > ( > jour_no bigint, > news_id bigint, > news_time timestamp, > news_titl varchar, > news_sour varchar, > corp_name varchar, > infln_org_corp_name varchar, > indx_id int, > emot varchar, > impt_prop int, > type int, > infln_coef decimal, > mult_infln decimal, > corp_cd varchar, > infln_org_corp_cd varchar, > data_clas int, > pntt_clas varchar, > rela_clas VARCHAR, > rela_rule VARCHAR, > flag tinyint, > inpt_time timestamp, > last_updt_dt timestamp, > bigd_updt_dt timestamp, > PRIMARY KEY(jour_no) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = > 'jdbc:mysql://10.88.200.XX:3306/yq?characterEncoding=utf8&autoReconnect=true&useSSL=false', > 'table-name' = 'news_org_pntt', > 'username' = 'app_yq', > 'driver' = 'com.mysql.jdbc.Driver', > 'password' = 'XXX', > 'sink.buffer-flush.max-rows' = '2000', > 'sink.buffer-flush.interval' = '2000' > ); > > CREATE VIEW tmp_X_NEWS_INDEX_TREE AS > SELECT > ID , > cast(INDEXCODE as int) as INDEXCODE, > INDEXNAME , > cast(INDEX_F_CODE as int) as INDEX_F_CODE , > NOWSTATUS , > Flag > from X_NEWS_INDEX_TREE_STATIC; > > CREATE VIEW tmp_X_NEWS_TCRNW0003_1_ALL AS > SELECT > a.ID as jour_no, > a.NewsCode as news_id, > n_01.CRNW0001_002 as news_time, > n_01.CRNW0001_003 as news_titl, > n_01.CRNW0001_007 as news_sour, > a.ITName as corp_name, > d.ITName as infln_org_corp_name, > a.CRNW0003_001 as indx_id, > case > when > a.CRNW0003_002 = 0 > then '一般' > when > a.CRNW0003_002 > 0 > then '正面' > when > a.CRNW0003_002 < 0 > then '预警' > end as emot, > a.CRNW0003_002 as impt_prop, > a.CRNW0003_003 as type, > a.CRNW0003_005 as infln_coef, > a.CRNW0003_006 as mult_infln, > a.CompanyCode as corp_cd, > d.ITCode as infln_org_corp_cd, > a.CRNW0003_010 as data_clas, > a.CRNW0003_007 as pntt_clas, > e.INDEXNAME as rela_clas, > c.INDEXNAME as rela_rule, > a.Flag as flag , > a.EntryDT as inpt_time, > a.LASTUPDT as last_updt_dt > from X_NEWS_TCRNW0003_1_ALL_CDC as a > left join X_NEWS_TCRNW0001_STATIC FOR SYSTEM_TIME AS OF a.pt AS n_01 on > a.NewsCode = n_01.NewsCode > left join X_NEWS_TCRNWITCODE_STATIC FOR SYSTEM_TIME AS OF a.pt AS d on > a.CRNW0003_004 = d.ITCode2 > left join tmp_X_NEWS_INDEX_TREE FOR SYSTEM_TIME AS OF a.pt AS c on > c.INDEXCODE = a.CRNW0003_001 > left join tmp_X_NEWS_INDEX_TREE FOR SYSTEM_TIME AS OF a.pt AS e on > c.INDEX_F_CODE = e.INDEXCODE > where n_01.Flag <> 1 and d.Flag <> 1 and c.Flag <> 1 and e.Flag <> 1 ; > > insert into news_org_pntt > select > jour_no , > news_id , > news_time , > news_titl , > news_sour , > corp_name , > infln_org_corp_name, > indx_id , > emot , > impt_prop , > type , > infln_coef , > mult_infln , > corp_cd , > infln_org_corp_cd , > data_clas , > pntt_clas , > rela_clas , > rela_rule , > flag , > inpt_time , > last_updt_dt , > now() > from tmp_X_NEWS_TCRNW0003_1_ALL; > > 发生错误时堆栈如下: > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: Temporal table join requires an equality condition > on fields of table [cpp_flink_catalog.flink.X_NEWS_INDEX_TREE_STATIC]. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1764) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > Caused by: org.apache.flink.table.api.TableException: Temporal table join > requires an equality condition on fields of table > [cpp_flink_catalog.flink.X_NEWS_INDEX_TREE_STATIC]. > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.validate(CommonExecLookupJoin.java:481) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.translateToPlanInternal(CommonExecLookupJoin.java:217) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:71) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752) > at > org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:124) > at com.cgws.ccp.flink.sql.submit.SqlSubmit.run(SqlSubmit.java:177) > at com.cgws.ccp.flink.sql.submit.SqlSubmit.main(SqlSubmit.java:106) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > ... 11 more > Concurrent marking: > 0 init marks: total time = 0.00 s (avg = 0.00 ms). > 3 remarks: total time = 0.07 s (avg = 22.53 ms). > [std. dev = 12.26 ms, max = 39.81 ms] > 3 final marks: total time = 0.00 s (avg = 1.48 ms). > [std. dev = 1.34 ms, max = 3.37 ms] > 3 weak refs: total time = 0.06 s (avg = 21.05 ms). > [std. dev = 10.93 ms, max = 36.43 ms] > 3 cleanups: total time = 0.01 s (avg = 2.00 ms). > [std. dev = 0.11 ms, max = 2.13 ms] > Final counting total time = 0.00 s (avg = 0.49 ms). > RS scrub total time = 0.00 s (avg = 0.61 ms). > Total stop_world time = 0.07 s. > Total concurrent time = 0.43 s ( 0.07 s marking). > > at > com.cgws.ccp.flink.job.FlinkJobSubmitter.doExecute(FlinkJobSubmitter.java:233) > at > com.cgws.ccp.flink.job.FlinkJobSubmitter.execute(FlinkJobSubmitter.java:240) > at com.cgws.ccp.server.jobs.JobManager.submit(JobManager.java:189) > at com.cgws.ccp.server.jobs.JobManager.submit(JobManager.java:156) > at com.cgws.ccp.server.jobs > .transitions.StartTransitionCallback.startJob(StartTransitionCallback.java:221) > at com.cgws.ccp.server.jobs > .transitions.StartTransitionCallback.access$700(StartTransitionCallback.java:27) > at com.cgws.ccp.server.jobs > .transitions.StartTransitionCallback$StartThread.adjustStatusAndStart(StartTransitionCallback.java:147) > at com.cgws.ccp.server.jobs > .transitions.StartTransitionCallback$StartThread.run(StartTransitionCallback.java:122) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > 还有其他的情况,比如2表join的字段:一个是int,一个是bigint,也会报上述错误,请帮忙看下是,万分感谢! > > > > > > > 发件人: Leonard Xu > 发送时间: 2022-10-22 16:58 > 收件人: user-zh > 主题: Re: Flink SQL 问题请教 > 你好,你的Flink 版本多少呀?我记得低版本才有这个问题。 > 另外SQL可以贴下嘛? > > 祝好, > Leonard > > > > > 2022年10月22日 上午11:11,邮件帮助中心 <yinghua...@163.com> 写道: > > > > 大家好! > > 最近在开发一个项目时,在使用CDC表和维表表做Temporal Table > JOIN时,发现2个表Join时join字段的类型必须一致,否则提交时提示如下的错误 > > The main method caused an error: Temporal table join requires an > equality condition on fields of table..... > > 为了解决上述问题,我们做了如下尝试: > > 1:在join时,对维表要关联的字段使用cast转换,如: JOIN ON CAST(tableA.filedA AS > INT) = cdc_table_b.fieldB,将2个关联表的关联字段类型保持一致 > > 2:在维表上建立一个视图,在视图定义字段的类型和select时使用cast转换,然后视图和cdc表进行join, > 此时join时字段类型理论上是一致的, > > 很可惜,上述2个解决办法未能解决问题,都是提示上述同样的错误(The main method caused an error: > Temporal table join requires an equality condition on fields of > table),如果在DDL中将维表要jion的字段和CDC表join的字段定义成相同的类型时,提交时不报上述错误,但在运行过程中处理数据时会出现castException,请教下大家上述问题可以怎么解决?不胜感激! > >