我看到这个 sql 是对维表的join on字段进行 cast,想问一下能否对主表(X_NEWS_TCRNW0003_1_ALL_CDC)的字段进行
cast 试试呢?

Shengkai
Alibaba

yinghua...@163.com <yinghua...@163.com> 于2022年10月24日周一 09:27写道:

> 出错时SQL如下:
> create table X_NEWS_TCRNW0003_1_ALL_CDC
> (
>  ID  bigint,
> NewsCode  bigint,
> NewsDate  timestamp,
> ITCode2  string,
> CompanyCode  string,
> CRNW0003_001  int,
> CRNW0003_002  int,
> CRNW0003_003  int,
> CRNW0003_004  string,
> CRNW0003_005  decimal,
> CRNW0003_006  decimal,
> CRNW0003_007  string,
> CRNW0003_010  int,
> ProStatus  tinyint,
> Flag  tinyint,
> TMStamp  string,
> FWDT  timestamp,
> EntryDT  timestamp,
> ENTRYDATE  timestamp,
> ENTRYTIME  string,
> LASTUPDT  timestamp,
> ITName  string,
> pt AS PROCTIME(),
>   PRIMARY KEY(ID, CRNW0003_001) NOT ENFORCED
> ) WITH (
>   'connector' = 'mysql-cdc',
>   'hostname' = '10.88.200.XX',
>   'port' = '3306',
>   'username' = 'root',
>   'password' = 'XXX',
>   'database-name'='dzhdb',
>   'table-name'='X_NEWS_TCRNW0003_1_ALL',
>   'debezium.query.fetch.size'= '1024',
>   'scan.incremental.snapshot.chunk.size'= '1024',
>   'debezium.min.row.count.to.stream.result' = '500',
>   'debezium.snapshot.delay.ms' = '1000',
>   'debezium.snapshot.fetch.size'='1024',
>   'scan.startup.mode'='latest-offset'
> );
>
>
> CREATE TABLE X_NEWS_TCRNW0001_STATIC(
>   ID  bigint,
>   NewsCode  bigint,
>   CRNW0001_001  int,
>   CRNW0001_002  timestamp,
>   CRNW0001_003  varchar,
>   CRNW0001_007  varchar,
>   Flag  tinyint,
>   EntryDT  timestamp,
>   LASTUPDT  timestamp,
>   PRIMARY KEY (ID) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' =
> 'jdbc:mysql://10.88.200.XX:3306/dzhdb?characterEncoding=utf8&autoReconnect=true&useSSL=false',
>   'table-name' = 'X_NEWS_TCRNW0001',
>   'username' = 'root',
>   'driver' = 'com.mysql.jdbc.Driver',
>   'password' = 'XXX',
>   'lookup.cache.max-rows' = '1000',
>   'lookup.cache.ttl' = '60s'
> );
>
> CREATE TABLE X_NEWS_TCRNWITCODE_STATIC(
> ID  bigint,
> ITCode2  varchar,
> ITCode  varchar,
> ITName  varchar,
> ITAName  varchar,
> CR0001_007  varchar,
> CR0001_008  varchar,
> CR0001_031  varchar,
> Flag  tinyint,
>   PRIMARY KEY (ID) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' =
> 'jdbc:mysql://10.88.200.XX:3306/dzhdb?characterEncoding=utf8&autoReconnect=true&useSSL=false',
>   'table-name' = 'X_NEWS_TCRNWITCODE',
>   'username' = 'root',
>   'driver' = 'com.mysql.jdbc.Driver',
>   'password' = 'XXX',
>   'lookup.cache.max-rows' = '1000',
>   'lookup.cache.ttl' = '60s'
> );
>
> CREATE TABLE X_NEWS_INDEX_TREE_STATIC(
> ID  bigint,
> INDEXCODE  varchar,
> INDEXNAME  varchar,
> INDEX_F_CODE  varchar,
> NOWSTATUS  int,
> Flag  int,
>   PRIMARY KEY(ID, INDEXCODE, INDEX_F_CODE) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' =
> 'jdbc:mysql://10.88.200.XX:3306/dzhdb?characterEncoding=utf8&autoReconnect=true&useSSL=false',
>   'table-name' = 'X_NEWS_INDEX_TREE',
>   'username' = 'root',
>   'driver' = 'com.mysql.jdbc.Driver',
>   'password' = 'XXX',
>   'lookup.cache.max-rows' = '1000',
>   'lookup.cache.ttl' = '60s'
> );
>
>
> create table news_org_pntt
> (
>   jour_no  bigint,
>   news_id  bigint,
>   news_time  timestamp,
>   news_titl  varchar,
>   news_sour  varchar,
>   corp_name  varchar,
>   infln_org_corp_name  varchar,
>   indx_id  int,
>   emot  varchar,
>   impt_prop  int,
>   type  int,
>   infln_coef  decimal,
>   mult_infln  decimal,
>   corp_cd  varchar,
>   infln_org_corp_cd  varchar,
>   data_clas  int,
>   pntt_clas  varchar,
>   rela_clas  VARCHAR,
>   rela_rule  VARCHAR,
>   flag  tinyint,
>   inpt_time  timestamp,
>   last_updt_dt  timestamp,
>   bigd_updt_dt  timestamp,
>   PRIMARY KEY(jour_no) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' =
> 'jdbc:mysql://10.88.200.XX:3306/yq?characterEncoding=utf8&autoReconnect=true&useSSL=false',
>   'table-name' = 'news_org_pntt',
>   'username' = 'app_yq',
>   'driver' = 'com.mysql.jdbc.Driver',
>   'password' = 'XXX',
>   'sink.buffer-flush.max-rows' = '2000',
>   'sink.buffer-flush.interval' = '2000'
> );
>
> CREATE VIEW tmp_X_NEWS_INDEX_TREE AS
> SELECT
> ID  ,
> cast(INDEXCODE as int) as  INDEXCODE,
> INDEXNAME  ,
> cast(INDEX_F_CODE as int) as  INDEX_F_CODE  ,
> NOWSTATUS  ,
> Flag
> from X_NEWS_INDEX_TREE_STATIC;
>
> CREATE VIEW tmp_X_NEWS_TCRNW0003_1_ALL AS
> SELECT
>    a.ID as jour_no,
>     a.NewsCode as news_id,
>     n_01.CRNW0001_002 as news_time,
>     n_01.CRNW0001_003 as news_titl,
>     n_01.CRNW0001_007 as news_sour,
>     a.ITName as corp_name,
>     d.ITName as infln_org_corp_name,
>     a.CRNW0003_001 as indx_id,
>     case
>         when
>             a.CRNW0003_002 = 0
>             then '一般'
>         when
>             a.CRNW0003_002 > 0
>             then '正面'
>         when
>             a.CRNW0003_002 < 0
>             then '预警'
>     end as emot,
>     a.CRNW0003_002 as impt_prop,
>     a.CRNW0003_003 as type,
>     a.CRNW0003_005 as infln_coef,
>     a.CRNW0003_006 as mult_infln,
>     a.CompanyCode as corp_cd,
>     d.ITCode as infln_org_corp_cd,
>     a.CRNW0003_010 as data_clas,
>     a.CRNW0003_007 as pntt_clas,
>     e.INDEXNAME as rela_clas,
>     c.INDEXNAME as rela_rule,
>     a.Flag as flag ,
>     a.EntryDT as inpt_time,
>     a.LASTUPDT as last_updt_dt
> from X_NEWS_TCRNW0003_1_ALL_CDC as a
> left join X_NEWS_TCRNW0001_STATIC FOR SYSTEM_TIME AS OF a.pt AS n_01 on
> a.NewsCode = n_01.NewsCode
> left join X_NEWS_TCRNWITCODE_STATIC FOR SYSTEM_TIME AS OF a.pt AS d on
> a.CRNW0003_004 = d.ITCode2
> left join tmp_X_NEWS_INDEX_TREE FOR SYSTEM_TIME AS OF a.pt AS c on
> c.INDEXCODE = a.CRNW0003_001
> left join tmp_X_NEWS_INDEX_TREE FOR SYSTEM_TIME AS OF a.pt AS e on
> c.INDEX_F_CODE = e.INDEXCODE
> where n_01.Flag <> 1 and d.Flag <> 1 and c.Flag <> 1 and e.Flag <> 1 ;
>
> insert into news_org_pntt
> select
> jour_no            ,
> news_id            ,
> news_time          ,
> news_titl          ,
> news_sour          ,
> corp_name          ,
> infln_org_corp_name,
> indx_id            ,
> emot               ,
> impt_prop          ,
> type               ,
> infln_coef         ,
> mult_infln         ,
> corp_cd            ,
> infln_org_corp_cd  ,
> data_clas          ,
> pntt_clas          ,
> rela_clas          ,
> rela_rule          ,
> flag               ,
> inpt_time          ,
> last_updt_dt       ,
> now()
> from tmp_X_NEWS_TCRNW0003_1_ALL;
>
> 发生错误时堆栈如下:
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Temporal table join requires an equality condition
> on fields of table [cpp_flink_catalog.flink.X_NEWS_INDEX_TREE_STATIC].
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>     at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>     at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>     at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>     at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>     at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>     at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1764)
>     at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.table.api.TableException: Temporal table join
> requires an equality condition on fields of table
> [cpp_flink_catalog.flink.X_NEWS_INDEX_TREE_STATIC].
>     at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.validate(CommonExecLookupJoin.java:481)
>     at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.translateToPlanInternal(CommonExecLookupJoin.java:217)
>     at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88)
>     at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114)
>     at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:71)
>     at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
>     at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
>     at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
>     at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752)
>     at
> org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:124)
>     at com.cgws.ccp.flink.sql.submit.SqlSubmit.run(SqlSubmit.java:177)
>     at com.cgws.ccp.flink.sql.submit.SqlSubmit.main(SqlSubmit.java:106)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>     ... 11 more
> Concurrent marking:
> 0 init marks: total time = 0.00 s (avg = 0.00 ms).
> 3 remarks: total time = 0.07 s (avg = 22.53 ms).
> [std. dev = 12.26 ms, max = 39.81 ms]
> 3 final marks: total time = 0.00 s (avg = 1.48 ms).
> [std. dev = 1.34 ms, max = 3.37 ms]
> 3 weak refs: total time = 0.06 s (avg = 21.05 ms).
> [std. dev = 10.93 ms, max = 36.43 ms]
> 3 cleanups: total time = 0.01 s (avg = 2.00 ms).
> [std. dev = 0.11 ms, max = 2.13 ms]
> Final counting total time = 0.00 s (avg = 0.49 ms).
> RS scrub total time = 0.00 s (avg = 0.61 ms).
> Total stop_world time = 0.07 s.
> Total concurrent time = 0.43 s ( 0.07 s marking).
>
>     at
> com.cgws.ccp.flink.job.FlinkJobSubmitter.doExecute(FlinkJobSubmitter.java:233)
>     at
> com.cgws.ccp.flink.job.FlinkJobSubmitter.execute(FlinkJobSubmitter.java:240)
>     at com.cgws.ccp.server.jobs.JobManager.submit(JobManager.java:189)
>     at com.cgws.ccp.server.jobs.JobManager.submit(JobManager.java:156)
>     at com.cgws.ccp.server.jobs
> .transitions.StartTransitionCallback.startJob(StartTransitionCallback.java:221)
>     at com.cgws.ccp.server.jobs
> .transitions.StartTransitionCallback.access$700(StartTransitionCallback.java:27)
>     at com.cgws.ccp.server.jobs
> .transitions.StartTransitionCallback$StartThread.adjustStatusAndStart(StartTransitionCallback.java:147)
>     at com.cgws.ccp.server.jobs
> .transitions.StartTransitionCallback$StartThread.run(StartTransitionCallback.java:122)
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
>
> 还有其他的情况,比如2表join的字段:一个是int,一个是bigint,也会报上述错误,请帮忙看下是,万分感谢!
>
>
>
>
>
>
> 发件人: Leonard Xu
> 发送时间: 2022-10-22 16:58
> 收件人: user-zh
> 主题: Re: Flink SQL 问题请教
> 你好,你的Flink 版本多少呀?我记得低版本才有这个问题。
> 另外SQL可以贴下嘛?
>
> 祝好,
> Leonard
>
>
>
> > 2022年10月22日 上午11:11,邮件帮助中心 <yinghua...@163.com> 写道:
> >
> > 大家好!
> >    最近在开发一个项目时,在使用CDC表和维表表做Temporal Table
> JOIN时,发现2个表Join时join字段的类型必须一致,否则提交时提示如下的错误
> >    The main method caused an error: Temporal table join requires an
> equality condition on fields of table.....
> >    为了解决上述问题,我们做了如下尝试:
> >         1:在join时,对维表要关联的字段使用cast转换,如: JOIN ON CAST(tableA.filedA  AS
> INT) = cdc_table_b.fieldB,将2个关联表的关联字段类型保持一致
> >         2:在维表上建立一个视图,在视图定义字段的类型和select时使用cast转换,然后视图和cdc表进行join,
> 此时join时字段类型理论上是一致的,
> >    很可惜,上述2个解决办法未能解决问题,都是提示上述同样的错误(The main method caused an error:
> Temporal table join requires an equality condition on fields of
> table),如果在DDL中将维表要jion的字段和CDC表join的字段定义成相同的类型时,提交时不报上述错误,但在运行过程中处理数据时会出现castException,请教下大家上述问题可以怎么解决?不胜感激!
>
>

回复