[ https://issues.apache.org/jira/browse/FLINK-21444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-21444: ----------------------------------- Labels: stale-assigned (was: ) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issue is assigned but has not received an update in 14, so it has been labeled "stale-assigned". If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience. If you are no longer working on the issue, please unassign yourself so someone else may work on it. If the "warning_label" label is not removed in 7 days, the issue will be automatically unassigned. > Lookup joins should deal with intermediate table scans correctly > ---------------------------------------------------------------- > > Key: FLINK-21444 > URL: https://issues.apache.org/jira/browse/FLINK-21444 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.12.0, 1.13.0 > Reporter: Caizhi Weng > Assignee: Caizhi Weng > Priority: Major > Labels: stale-assigned > > Add the following test case to > {{org.apache.flink.table.planner.runtime.stream.sql.LookupJoinITCase}} > {code:scala} > @Test > def myTest(): Unit = { > > tEnv.getConfig.getConfiguration.setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, > true) > > tEnv.getConfig.getConfiguration.setBoolean(RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, > true) > val ddl1 = > """ > |CREATE TABLE sink1 ( > | `id` BIGINT > |) WITH ( > | 'connector' = 'blackhole' > |) > |""".stripMargin > tEnv.executeSql(ddl1) > val ddl2 = > """ > |CREATE TABLE sink2 ( > | `id` BIGINT > |) WITH ( > | 'connector' = 'blackhole' > |) > |""".stripMargin > tEnv.executeSql(ddl2) > val sql1 = "INSERT INTO sink1 SELECT T.id FROM src AS T JOIN user_table for > system_time as of T.proctime AS D ON T.id = D.id" > val sql2 = "INSERT INTO sink2 SELECT T.id FROM src AS T JOIN user_table for > system_time as of T.proctime AS D ON T.id + 1 = D.id" > val stmtSet = tEnv.createStatementSet() > stmtSet.addInsertSql(sql1) > stmtSet.addInsertSql(sql2) > stmtSet.execute().await() > } > {code} > The following exception will occur > {code} > org.apache.flink.table.api.ValidationException: Temporal Table Join requires > primary key in versioned table, but no primary key can be found. The physical > plan is: > FlinkLogicalJoin(condition=[AND(=($0, $2), > __INITIAL_TEMPORAL_JOIN_CONDITION($1, __TEMPORAL_JOIN_LEFT_KEY($0), > __TEMPORAL_JOIN_RIGHT_KEY($2)))], joinType=[inner]) > FlinkLogicalCalc(select=[id, proctime]) > FlinkLogicalIntermediateTableScan(table=[[IntermediateRelTable_0]], > fields=[id, len, content, proctime]) > FlinkLogicalSnapshot(period=[$cor0.proctime]) > FlinkLogicalCalc(select=[id]) > FlinkLogicalIntermediateTableScan(table=[[IntermediateRelTable_1]], > fields=[age, id, name]) > at > org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule.org$apache$flink$table$planner$plan$rules$logical$TemporalJoinRewriteWithUniqueKeyRule$$validateRightPrimaryKey(TemporalJoinRewriteWithUniqueKeyRule.scala:124) > at > org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:88) > at > org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:70) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) > at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158) > at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110) > at > org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:109) > at > org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule$$anon$1.visitCall(TemporalJoinRewriteWithUniqueKeyRule.scala:70) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) > at > org.apache.flink.table.planner.plan.rules.logical.TemporalJoinRewriteWithUniqueKeyRule.onMatch(TemporalJoinRewriteWithUniqueKeyRule.scala:70) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) > at > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$StreamCommonSubGraphBasedOptimizer$$inferTraits(StreamCommonSubGraphBasedOptimizer.scala:214) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer$$anonfun$doOptimize$2.apply(StreamCommonSubGraphBasedOptimizer.scala:90) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer$$anonfun$doOptimize$2.apply(StreamCommonSubGraphBasedOptimizer.scala:89) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:89) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:288) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:161) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1399) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701) > at > org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99) > at > org.apache.flink.table.planner.runtime.stream.sql.LookupJoinITCase.myTest(LookupJoinITCase.scala:173) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > {code} > {{FlinkLogicalIntermediateTableScan}} here indicates that the table scan of > the lookup join is allocated to another block when optimizing. However, rules > of lookup tables are not dealing with intermediate tables now. -- This message was sent by Atlassian Jira (v8.3.4#803005)