[ 
https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-28986:
------------------------------
    Summary: UNNEST function with nested filter fails to generate plan  (was: 
UNNEST function with nested fails to generate plan)

> UNNEST function with nested filter fails to generate plan
> ---------------------------------------------------------
>
>                 Key: FLINK-28986
>                 URL: https://issues.apache.org/jira/browse/FLINK-28986
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.16.0
>            Reporter: Jane Chan
>            Assignee: Jane Chan
>            Priority: Major
>         Attachments: image-2022-08-16-14-36-07-061.png
>
>
> h3. How to reproduce
> add the following case to TableEnvironmentITCase
> {code:scala}
> @Test
> def debug(): Unit = {
>   tEnv.executeSql(
>     s"""
>        |CREATE TEMPORARY TABLE source_kafka_wip_his_all (
>        |  GUID varchar,
>        |  OPERATION varchar,
>        |  PRODUCTID varchar,
>        |  LOTNO varchar,
>        |  SERIALNO varchar,
>        |  QUERYSERIALNO varchar,
>        |  SERIALNO1 varchar,
>        |  SERIALNO2 varchar,
>        |  WIPORDERNO varchar,
>        |  WIPORDERTYPE varchar,
>        |  VIRTUALLOT varchar,
>        |  PREOPERATION varchar,
>        |  NORMALPREOPERATION varchar,
>        |  PROCESSID varchar,
>        |  EQUIPMENT varchar,
>        |  INBOUNDDATE varchar,
>        |  OUTBOUNDDATE varchar,
>        |  REWORK varchar,
>        |  REWORKPROCESSID varchar,
>        |  CONTAINER varchar,
>        |  WIPCONTENTCLASSID varchar,
>        |  STATUSCODE varchar,
>        |  WIPSTATUS varchar,
>        |  TESTPROCESSID varchar,
>        |  TESTORDERTYPE varchar,
>        |  TESTORDER varchar,
>        |  TEST varchar,
>        |  SORTINGPROCESSID varchar,
>        |  SORTINGORDERTYPE varchar,
>        |  SORTINGORDER varchar,
>        |  SORTING varchar,
>        |  MINO varchar,
>        |  GROUPCODE varchar,
>        |  HIGHLOWGROUP varchar,
>        |  PRODUCTNO varchar,
>        |  FACILITY varchar,
>        |  WIPLINE varchar,
>        |  CHILDEQUCODE varchar,
>        |  STATION varchar,
>        |  QTY varchar,
>        |  PASS_FLAG varchar,
>        |  DEFECTCODELIST varchar,
>        |  ISFIRST varchar,
>        |  PARALIST ARRAY<ROW(GUID string,WIP_HIS_GUID string,QUERYSERIALNO 
> string,OPERATION string,REWORKPROCESSID string,CHARACTERISTIC 
> string,CHARACTERISTICREVISION string,CHARACTERISTICTYPE 
> string,CHARACTERISTICCLASS string,UPPERCONTROLLIMIT string,TARGETVALUE 
> string,LOWERCONTROLLIMIT string,TESTVALUE string,TESTATTRIBUTE 
> string,TESTINGSTARTDATE string,TESTFINISHDATE string,UOMCODE 
> string,DEFECTCODE string,SPECPARAMID string,STATION string,GP_TIME 
> string,REFERENCEID string,LASTUPDATEON string,LASTUPDATEDBY string,CREATEDON 
> string,CREATEDBY string,ACTIVE string,LASTDELETEON string,LASTDELETEDBY 
> string,LASTREACTIVATEON string,LASTREACTIVATEDBY string,ARCHIVEID 
> string,LASTARCHIVEON string,LASTARCHIVEDBY string,LASTRESTOREON 
> string,LASTRESTOREDBY string,ROWVERSIONSTAMP string)>,
>        |  REFERENCEID varchar,
>        |  LASTUPDATEON varchar,
>        |  LASTUPDATEDBY varchar,
>        |  CREATEDON varchar,
>        |  CREATEDBY varchar,
>        |  ACTIVE varchar,
>        |  LASTDELETEON varchar,
>        |  LASTDELETEDBY varchar,
>        |  LASTREACTIVATEON varchar,
>        |  LASTREACTIVATEDBY varchar,
>        |  ARCHIVEID varchar,
>        |  LASTARCHIVEON varchar,
>        |  LASTARCHIVEDBY varchar,
>        |  LASTRESTOREON varchar,
>        |  LASTRESTOREDBY varchar,
>        |  ROWVERSIONSTAMP varchar,
>        |  proctime as PROCTIME()
>        |  ) with (
>        |  'connector' = 'datagen'
>        |)
>        |""".stripMargin)
>   tEnv.executeSql(
>     s"""
>        |create TEMPORARY view transform_main_data as
>        |select
>        |      r.GUID as wip_his_guid,
>        |      r.EQUIPMENT as equipment,
>        |      r.WIPLINE as wipline,
>        |      r.STATION as station,
>        |      cast(r.PROCESSID as decimal) as processid,
>        |      r.PRODUCTNO as productno,
>        |      t.TESTFINISHDATE as testfinishdate,
>        |      t.OPERATION as operation,
>        |      t.CHARACTERISTIC as characteristic,
>        |      t.LOWERCONTROLLIMIT as lowercontrollimit,
>        |      t.UPPERCONTROLLIMIT as uppercontrollimit,
>        |      t.TARGETVALUE as targetvalue,
>        |      t.DEFECTCODE as defectcode,
>        |      t.TESTVALUE as testvalue,
>        |      t.CHARACTERISTICTYPE as characteristictype,
>        |      proctime
>        |  from
>        |  (select
>        |      GUID,
>        |      EQUIPMENT,
>        |      WIPLINE,
>        |      STATION,
>        |      PROCESSID,
>        |      PRODUCTNO,
>        |      PARALIST,
>        |      proctime
>        |  FROM source_kafka_wip_his_all) r
>        |  cross join
>        |  unnest(PARALIST) as t 
> (GUID,WIP_HIS_GUID,QUERYSERIALNO,OPERATION,REWORKPROCESSID,CHARACTERISTIC,CHARACTERISTICREVISION,CHARACTERISTICTYPE,CHARACTERISTICCLASS,UPPERCONTROLLIMIT,TARGETVALUE,LOWERCONTROLLIMIT,TESTVALUE,TESTATTRIBUTE,TESTINGSTARTDATE,TESTFINISHDATE,UOMCODE,DEFECTCODE,SPECPARAMID,STATION,GP_TIME,REFERENCEID,LASTUPDATEON,LASTUPDATEDBY,CREATEDON,CREATEDBY,ACTIVE,LASTDELETEON,LASTDELETEDBY,LASTREACTIVATEON,LASTREACTIVATEDBY,ARCHIVEID,LASTARCHIVEON,LASTARCHIVEDBY,LASTRESTOREON,LASTRESTOREDBY,ROWVERSIONSTAMP)
>        |  where t.CHARACTERISTICTYPE = '2'
>        |""".stripMargin)
>   tEnv.executeSql(
>     s"""
>        |explain plan for
>        |select * from transform_main_data
>        |where operation not in 
> ('G1208','G1209','G1211','G1213','G1206','G1207','G1214','G1215','G1282','G1292','G1216')
>        |""".stripMargin).print()
> } {code}
> Stacktrace
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: LogicalProject(inputs=[0..3], 
> exprs=[[CAST($4):DECIMAL(10, 0), $5, $23, $11, $13, $19, $17, $18, $25, $20, 
> $15, $7]])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
> requiredColumns=[{6}])
>    :- LogicalProject(inputs=[0], exprs=[[$14, $36, $38, $13, $34, $43, 
> PROCTIME()]])
>    :  +- LogicalTableScan(table=[[default_catalog, default_database, 
> source_kafka_wip_his_all]])
>    +- LogicalFilter(condition=[AND(SEARCH($7, 
> Sarg[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), SEARCH($3, 
> Sarg[(-∞.._UTF-16LE'G1206'), (_UTF-16LE'G1206'.._UTF-16LE'G1207'), 
> (_UTF-16LE'G1207'.._UTF-16LE'G1208'), (_UTF-16LE'G1208'.._UTF-16LE'G1209'), 
> (_UTF-16LE'G1209'.._UTF-16LE'G1211'), (_UTF-16LE'G1211'.._UTF-16LE'G1213'), 
> (_UTF-16LE'G1213'.._UTF-16LE'G1214'), (_UTF-16LE'G1214'.._UTF-16LE'G1215'), 
> (_UTF-16LE'G1215'.._UTF-16LE'G1216'), (_UTF-16LE'G1216'.._UTF-16LE'G1282'), 
> (_UTF-16LE'G1282'.._UTF-16LE'G1292'), (_UTF-16LE'G1292'..+∞)]:CHAR(5) 
> CHARACTER SET "UTF-16LE"))])
>       +- Uncollect
>          +- LogicalProject(exprs=[[$cor1.PARALIST]])
>             +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 
> }]])This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(inputs=[0..3], exprs=[[CAST($4):DECIMAL(10, 0), $5, $23, $11, 
> $13, $19, $17, $18, $25, $20, $15, $7]])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
> requiredColumns=[{6}])
>    :- LogicalProject(inputs=[0], exprs=[[$14, $36, $38, $13, $34, $43, 
> PROCTIME()]])
>    :  +- LogicalTableScan(table=[[default_catalog, default_database, 
> source_kafka_wip_his_all]])
>    +- LogicalFilter(condition=[AND(SEARCH($7, 
> Sarg[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), SEARCH($3, 
> Sarg[(-∞.._UTF-16LE'G1206'), (_UTF-16LE'G1206'.._UTF-16LE'G1207'), 
> (_UTF-16LE'G1207'.._UTF-16LE'G1208'), (_UTF-16LE'G1208'.._UTF-16LE'G1209'), 
> (_UTF-16LE'G1209'.._UTF-16LE'G1211'), (_UTF-16LE'G1211'.._UTF-16LE'G1213'), 
> (_UTF-16LE'G1213'.._UTF-16LE'G1214'), (_UTF-16LE'G1214'.._UTF-16LE'G1215'), 
> (_UTF-16LE'G1215'.._UTF-16LE'G1216'), (_UTF-16LE'G1216'.._UTF-16LE'G1282'), 
> (_UTF-16LE'G1282'.._UTF-16LE'G1292'), (_UTF-16LE'G1292'..+∞)]:CHAR(5) 
> CHARACTER SET "UTF-16LE"))])
>       +- Uncollect
>          +- LogicalProject(exprs=[[$cor1.PARALIST]])
>             +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 
> }]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>     at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:527)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:96)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:695)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1356)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:733)
>     at 
> org.apache.flink.table.api.TableEnvironmentITCase.debug(TableEnvironmentITCase.scala:695)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>     at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>     at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>     at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>     at org.junit.runners.Suite.runChild(Suite.java:128)
>     at org.junit.runners.Suite.runChild(Suite.java:27)
>     at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>     at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>     at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>     at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
>     at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
> Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There 
> are not enough rules to produce a node with desired properties: 
> convention=LOGICAL, FlinkRelDistributionTraitDef=any, 
> MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], 
> UpdateKindTraitDef=[NONE].
> Missing conversion is Uncollect[convention: NONE -> LOGICAL]
> There is 1 empty subset: rel#485:RelSubset#4.LOGICAL.any.None: 
> 0.[NONE].[NONE], the relevant part of the original plan is as follows
> 460:Uncollect
>   458:LogicalProject(subset=[rel#459:RelSubset#3.NONE.any.None: 
> 0.[NONE].[NONE]], PARALIST=[$cor1.PARALIST])
>     17:LogicalValues(subset=[rel#457:RelSubset#2.NONE.any.None: 
> 0.[NONE].[NONE]], tuples=[[{ 0 }]]){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to