你好,因业务需求,flink 1.13,MySQL CDC 2.1.1下需要将三张满足时态表结构的表关联,在没有对关联结果加where条件时,可以正常运行,加了where条件后,报错如下: SQL: insert into sink select count(1) as machine from tbl_schedule_job as job join tbl_schedule_task FOR SYSTEM_TIME AS OF job.lastUpdateTime as t on t.jobId = job.jobId FOR SYSTEM_TIME AS OF t.lastModifiedTime AS n where job.jobStatus in ('RUNNING','INITING','ERROR') 报错: Exception in thread "main" java.lang.AssertionError: mismatched type $6 TIMESTAMP(3) at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2710) at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2688) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:33) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:268) at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:238) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:256) at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1811) at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:189) at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:377) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271) at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:168) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1516) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:738) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:854) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728) at com.test.FlinkCdcTest.main(FlinkCdcTest.java:107) 建表语句: CREATE TABLE `tbl_schedule_job` (`jobId` VARCHAR , `appId` VARCHAR , `jobStatus` VARCHAR, `lastUpdateTime` TIMESTAMP(3), PRIMARY KEY (`jobId`) NOT ENFORCED, WATERMARK FOR lastUpdateTime AS lastUpdateTime - INTERVAL '1' MINUTE )WITH( 'connector' = 'mysql-cdc', 'hostname' = 'xxx', 'port' = '3306', 'username' = 'xxx', 'password' = 'xxx', 'database-name' = 'xxx', 'table-name' = 'xxx' ); CREATE TABLE `tbl_schedule_task` ( `jobId` VARCHAR, `nodeId` VARCHAR, `lastModifiedTime` TIMESTAMP(3), PRIMARY KEY (`jobId`) NOT ENFORCED, WATERMARK FOR lastModifiedTime AS lastModifiedTime - INTERVAL '1' MINUTE )WITH( 'connector' = 'mysql-cdc', 'hostname' = 'xxx', 'port' = '3306', 'username' = 'xxx', 'password' = 'xxx', 'database-name' = 'xxx', 'table-name' = 'xxx' ); CREATE TABLE `tbl_broker_node` (`id` VARCHAR , `resourcesSpec` VARCHAR , `region` VARCHAR , `machineStatus` VARCHAR , `poolId` VARCHAR, `lastUpdateTime` TIMESTAMP(3), PRIMARY KEY (`id`) NOT ENFORCED, WATERMARK FOR lastUpdateTime AS lastUpdateTime - INTERVAL '1' MINUTE )WITH( 'connector' = 'mysql-cdc', 'hostname' = 'xxx', 'port' = '3306', 'username' = 'xxxx', 'password' = 'xxx', 'database-name' = 'xxx', 'table-name' = 'xxx' ); CREATE table sink( machine BIGINT, PRIMARY KEY (`machine`) NOT ENFORCED ) with ( 'connector' = 'upsert-kafka', 'property.version' = 'universal', 'properties.bootstrap.servers' = '', 'properties.group.id' = 'sink', 'topic' = 'sink_test', 'key.format' = 'json', 'value.format' = 'json' );
为了解决以上问题,我曾尝试将where条件写到子查询中,报错情况如下: SQL: insert into sink select count(1) as machine from ( (select * from tbl_schedule_job job where job.jobStatus in ('RUNNING','INITING','ERROR'))as job join tbl_schedule_task FOR SYSTEM_TIME AS OF job.lastUpdateTime as t on t.jobId = job.jobId join tbl_broker_node FOR SYSTEM_TIME AS OF t.lastModifiedTime AS n on t.nodeId = n.id) 报错: Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "as" at line 4, column 90. Was expecting one of: "EXCEPT" ... "FETCH" ... "INTERSECT" ... "LIMIT" ... "OFFSET" ... "ORDER" ... "MINUS" ... "UNION" ... ")" ... "." ... "NOT" ... "IN" ... "<" ... "<=" ... ">" ... ">=" ... "=" ... "<>" ... "!=" ... "BETWEEN" ... "LIKE" ... "SIMILAR" ... "+" ... "-" ... "*" ... "/" ... "%" ... "||" ... "AND" ... "OR" ... "IS" ... "MEMBER" ... "SUBMULTISET" ... "CONTAINS" ... "OVERLAPS" ... "EQUALS" ... "PRECEDES" ... "SUCCEEDS" ... "IMMEDIATELY" ... "MULTISET" ... "[" ... "FORMAT" ... "YEAR" ... "YEARS" ... "MONTH" ... "MONTHS" ... "DAY" ... "DAYS" ... "HOUR" ... "HOURS" ... "MINUTE" ... "MINUTES" ... "SECOND" ... "SECONDS" ... at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:96) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:722) at com.test.FlinkCdcTest.main(FlinkCdcTest.java:107) 然而这段SQL可以正常在DBeaver中运行,所以我不知道是哪里格式出了问题?