你好,因业务需求,flink 1.13,MySQL CDC 
2.1.1下需要将三张满足时态表结构的表关联,在没有对关联结果加where条件时,可以正常运行,加了where条件后,报错如下:
SQL:
insert into sink
select count(1) as machine from tbl_schedule_job as job
join tbl_schedule_task FOR SYSTEM_TIME AS OF job.lastUpdateTime as t
on t.jobId = job.jobId FOR SYSTEM_TIME AS OF t.lastModifiedTime AS n
where job.jobStatus in ('RUNNING','INITING','ERROR')
报错:
Exception in thread "main" java.lang.AssertionError: mismatched type $6 
TIMESTAMP(3)
         at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2710)
         at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2688)
         at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
         at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
         at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
         at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:33)
         at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
         at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:268)
         at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:238)
         at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:256)
         at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1811)
         at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:189)
         at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:377)
         at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
         at 
org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
         at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
         at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271)
         at 
org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
         at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
         at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
         at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
         at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
         at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
         at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
         at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
         at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
         at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
         at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
         at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
         at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
         at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
         at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
         at scala.collection.immutable.Range.foreach(Range.scala:160)
         at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
         at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
         at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
         at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
         at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
         at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
         at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
         at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
         at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
         at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
         at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
         at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
         at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
         at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284)
         at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:168)
         at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1516)
         at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:738)
         at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:854)
         at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
         at com.test.FlinkCdcTest.main(FlinkCdcTest.java:107)
建表语句:
CREATE TABLE `tbl_schedule_job`
(`jobId` VARCHAR ,
 `appId` VARCHAR ,
 `jobStatus` VARCHAR,
 `lastUpdateTime` TIMESTAMP(3),
  PRIMARY KEY (`jobId`) NOT ENFORCED,
  WATERMARK FOR lastUpdateTime AS lastUpdateTime - INTERVAL '1' MINUTE
 )WITH(
     'connector' = 'mysql-cdc',
     'hostname' = 'xxx',
     'port' = '3306',
     'username' = 'xxx',
     'password' = 'xxx',
     'database-name' = 'xxx',
     'table-name' = 'xxx'
 );
CREATE TABLE `tbl_schedule_task`
( `jobId` VARCHAR,
  `nodeId` VARCHAR,
  `lastModifiedTime` TIMESTAMP(3),
   PRIMARY KEY (`jobId`) NOT ENFORCED,
   WATERMARK FOR lastModifiedTime AS lastModifiedTime - INTERVAL '1' MINUTE
 )WITH(
     'connector' = 'mysql-cdc',
     'hostname' = 'xxx',
     'port' = '3306',
     'username' = 'xxx',
     'password' = 'xxx',
     'database-name' = 'xxx',
     'table-name' = 'xxx'
 );
 CREATE TABLE `tbl_broker_node`
 (`id` VARCHAR ,
 `resourcesSpec` VARCHAR ,
 `region` VARCHAR ,
 `machineStatus` VARCHAR ,
 `poolId` VARCHAR,
 `lastUpdateTime` TIMESTAMP(3),
  PRIMARY KEY (`id`) NOT ENFORCED,
  WATERMARK FOR lastUpdateTime AS lastUpdateTime - INTERVAL '1' MINUTE
 )WITH(
     'connector' = 'mysql-cdc',
     'hostname' = 'xxx',
     'port' = '3306',
     'username' = 'xxxx',
     'password' = 'xxx',
     'database-name' = 'xxx',
     'table-name' = 'xxx'
 );
CREATE table sink(
  machine BIGINT,
  PRIMARY KEY (`machine`) NOT ENFORCED
) with (
  'connector' = 'upsert-kafka',
  'property.version' = 'universal',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = 'sink',
  'topic' = 'sink_test',
  'key.format' = 'json',
  'value.format' = 'json'
);

为了解决以上问题,我曾尝试将where条件写到子查询中,报错情况如下:
SQL:
insert into sink
select count(1) as machine
from (
(select * from tbl_schedule_job job where job.jobStatus in 
('RUNNING','INITING','ERROR'))as job
join tbl_schedule_task FOR SYSTEM_TIME AS OF job.lastUpdateTime as t
on t.jobId = job.jobId
join tbl_broker_node FOR SYSTEM_TIME AS OF t.lastModifiedTime AS n
on t.nodeId = n.id)
报错:
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
parse failed. Encountered "as" at line 4, column 90.
Was expecting one of:
    "EXCEPT" ...
    "FETCH" ...
    "INTERSECT" ...
    "LIMIT" ...
    "OFFSET" ...
    "ORDER" ...
   "MINUS" ...
    "UNION" ...
    ")" ...
    "." ...
    "NOT" ...
    "IN" ...
    "<" ...
    "<=" ...
    ">" ...
    ">=" ...
    "=" ...
    "<>" ...
    "!=" ...
    "BETWEEN" ...
    "LIKE" ...
    "SIMILAR" ...
    "+" ...
    "-" ...
    "*" ...
    "/" ...
    "%" ...
    "||" ...
    "AND" ...
    "OR" ...
    "IS" ...
    "MEMBER" ...
    "SUBMULTISET" ...
    "CONTAINS" ...
    "OVERLAPS" ...
    "EQUALS" ...
    "PRECEDES" ...
    "SUCCEEDS" ...
    "IMMEDIATELY" ...
    "MULTISET" ...
    "[" ...
    "FORMAT" ...
    "YEAR" ...
    "YEARS" ...
    "MONTH" ...
    "MONTHS" ...
    "DAY" ...
    "DAYS" ...
    "HOUR" ...
    "HOURS" ...
    "MINUTE" ...
    "MINUTES" ...
    "SECOND" ...
    "SECONDS" ...

         at 
org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
         at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:96)
         at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:722)
         at com.test.FlinkCdcTest.main(FlinkCdcTest.java:107)

然而这段SQL可以正常在DBeaver中运行,所以我不知道是哪里格式出了问题?

回复