Hi! 没有复现这个问题。后续是还有其他 SQL 吗?特别是对 aaa.columnInfos [ 'EVENT_ID' ].newValue 有 filter 的 SQL。
可以尝试先在本地找一个能复现的 case,通过 tableEnv.explainSql() 就能快速检查能否产生 plan。如果找到了能复现的 SQL,可以在 jira 提一个 issue。 Wayne <1...@163.com> 于2021年9月2日周四 上午10:43写道: > 表a > CREATE TABLE aaa ( > > > `columnInfos` MAP<STRING NOT NULL,ROW<oldValue STRING NULL ,newValue > STRING ,name STRING NOT NULL ,isKeyColumn BOOLEAN NOT NULL,type STRING NOT > NULL > NOT NULL> NOT NULL, > ....... > procTime AS PROCTIME() > ) WITH ( > 'connector' = 'kafka' , > ..... > 'format' = 'avro' > ) > > > > > 表b > > > CREATE TABLE bbb ( > `event_id` STRING, > `genre_id` STRING, > `user_guid` STRING, > ...... > `uuid` STRING > ) WITH ( > 'connector' = 'kafka' , > ..... > 'format' = 'avro' > ) > > > > > > > 我的sql > > > SELECT > CONCAT_WS( > '_', > user_guid, > CAST( CAST( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, > 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ) AS BIGINT ) AS STRING )) AS rowkey, > columnInfos [ 'TICKET_COST' ].newValue AS cost, > DATE_FORMAT( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, > 'yyyy-MM-dd:HH:mm:ss.SSSSSSSSS' ), 'yyyy-MM-dd' ) AS crdate, > columnInfos [ 'EVENT_ID' ].newValue AS eventid, > columnInfos [ 'QUANTITY' ].newValue AS quantity, > genre_id AS genreId, > user_guid AS userGuid > FROM > aaa > LEFT JOIN bbb FOR SYSTEM_TIME AS OF aaa.procTime ON aaa.columnInfos [ > 'EVENT_ID' ].newValue = bbb.event_id and aaa.columnInfos [ 'EVENT_ID' > ].newValue is not null and bbb.event_id is not null > > > 最后的 and aaa.columnInfos [ 'EVENT_ID' ].newValue is not null and > bbb.event_id is not null 无论加不加,都是报这个错误 > > 在 2021-09-02 09:13:11,"Caizhi Weng" <tsreape...@gmail.com> 写道: > >Hi! > > > >Orders 和 Customers 就直接是 source 表吗?还是说 source 表到维表 join 之间有关于 customer_id 或 > >id 的 filter 条件? > > > >有一定可能是之前有关于 customer_id 或 id 的 filter 条件(比如 customer_id = 1),导致维表 join > >这里的等值条件被优化成了 customer_id = 1 和 id = 1 并分别下推到维表 join 之前。 > > > >Wayne <1...@163.com> 于2021年9月1日周三 下午6:27写道: > > > >> 我的flink 版本是 flink-1.12.2-bin-scala_2.12 > >> 我的sql 是 > >> SELECT o.order_id, o.total, c.country, c.zip > >> FROM Orders AS o > >> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c > >> ON o.customer_id = c.id and o.customer_id is not null and c.id is > >> not null ; > >> 或者 > >> SELECT o.order_id, o.total, c.country, c.zip > >> FROM Orders AS o > >> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c > >> ON o.customer_id = c.id ; > >> > >> 都会报如下错误,麻烦帮我看看正确的写法是什么样的,灰常感谢 > >> > >> Exception in thread "main" > org.apache.flink.table.api.ValidationException: > >> Currently the join key in Temporal Table Join can not be empty. > >> at > >> > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272) > >> at > >> > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > >> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > >> at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > >> at > >> > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > >> at > >> > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > >> at > >> > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) > >> at > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63) > >> at > >> > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > >> at > >> > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > >> at scala.collection.Iterator.foreach(Iterator.scala:943) > >> at scala.collection.Iterator.foreach$(Iterator.scala:943) > >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > >> at scala.collection.IterableLike.foreach(IterableLike.scala:74) > >> at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > >> at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > >> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > >> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60) > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55) > >> at > >> > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > >> at > >> > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > >> at scala.collection.immutable.Range.foreach(Range.scala:158) > >> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > >> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) > >> at > >> > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > >> at > >> > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > >> at scala.collection.Iterator.foreach(Iterator.scala:943) > >> at scala.collection.Iterator.foreach$(Iterator.scala:943) > >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > >> at scala.collection.IterableLike.foreach(IterableLike.scala:74) > >> at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > >> at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > >> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > >> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > >> at > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) > >> at > >> > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > >> at > >> > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > >> at > >> > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707) > >> at > >> > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577) > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> >