YuvalItzchakov edited a comment on pull request #15307:
URL: https://github.com/apache/flink/pull/15307#issuecomment-815626304


   @fsk119 I have a problem with `FilterableSourceTest`. If I move the new rule 
back up again after the watermark pushdown, in 
`FilterableSourceTest.testFilterPushdownWithUdf` it fails since there are 
multiple calc being created, where the parent calc has the actual filter.
   
   If I move the rule down after the `FlinkCalcMergeRule`, the 
`stream.LookupJoinTest` tests fail. For example, 
`testJoinTemporalTableWithCalcPushDown` fails with:
   
   ```
   org.apache.flink.table.api.TableException: Cannot generate a valid execution 
plan for the given query: 
   
   FlinkLogicalCalc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS 
proctime, rowtime, id, name, CAST(10) AS age])
   +- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
      :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, 
default_database, MyTable]])
      +- FlinkLogicalSnapshot(period=[$cor0.proctime])
         +- FlinkLogicalCalc(select=[id, name], where=[AND(=(age, 10), 
>(CAST(name), 1000))])
            +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, LookupTable, filter=[]]], fields=[id, name, age])
   
   This exception indicates that the query uses an unsupported SQL feature.
   Please check the documentation for the set of currently supported SQL 
features.
   
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:281)
        at 
org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:889)
        at 
org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:780)
        at 
org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:583)
        at 
org.apache.flink.table.planner.plan.stream.sql.join.LookupJoinTest.testJoinTemporalTableWithCalcPushDown(LookupJoinTest.scala:327)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
        at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
        at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
        at org.junit.rules.RunRules.evaluate(RunRules.java:20)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runners.Suite.runChild(Suite.java:128)
        at org.junit.runners.Suite.runChild(Suite.java:27)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
        at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
   Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There 
are not enough rules to produce a node with desired properties: 
convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, 
MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], 
UpdateKindTraitDef=[NONE].
   Missing conversion is FlinkLogicalJoin[convention: LOGICAL -> 
STREAM_PHYSICAL]
   There is 1 empty subset: rel#5034:RelSubset#11.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE], the relevant part of the original plan is as follows
   5021:FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
     
4964:FlinkLogicalDataStreamTableScan(subset=[rel#5015:RelSubset#7.LOGICAL.any.None:
 0.[NONE].[NONE]], table=[[default_catalog, default_database, MyTable]])
     5019:FlinkLogicalSnapshot(subset=[rel#5020:RelSubset#10.LOGICAL.any.None: 
0.[NONE].[NONE]], period=[$cor0.proctime])
       5017:FlinkLogicalCalc(subset=[rel#5018:RelSubset#9.LOGICAL.any.None: 
0.[NONE].[NONE]], select=[id, name], where=[AND(=(age, 10), >(CAST(name), 
1000))])
         
5008:FlinkLogicalTableSourceScan(subset=[rel#5016:RelSubset#8.LOGICAL.any.None: 
0.[NONE].[NONE]], table=[[default_catalog, default_database, LookupTable, 
filter=[]]], fields=[id, name, age])
   ```
   
   Not sure how to tackle this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to