[ 
https://issues.apache.org/jira/browse/FLINK-31163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17723514#comment-17723514
 ] 

Sergey Nuyanzin edited comment on FLINK-31163 at 5/17/23 3:37 PM:
------------------------------------------------------------------

Partially it is fixed within CALCITE-4913 and could come with Calcite 1.31.0 
upgrade FLINK-28744
Partially need to debug a bit correlation vars namespace resolution (probably 
on Calcite level)


was (Author: sergey nuyanzin):
Partially it fixed within CALCITE-4913 and could come with Calcite 1.31.0 
upgrade FLINK-28744
Partially need to debug a bit correlation vars namespace resolution (probably 
on Calcite level)

> Unexpected correlate variable $cor0 in the plan error in where clause
> ---------------------------------------------------------------------
>
>                 Key: FLINK-31163
>                 URL: https://issues.apache.org/jira/browse/FLINK-31163
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.16.0
>            Reporter: P Rohan Kumar
>            Priority: Major
>
> {code:java}
> val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env)
> val accountsTd = 
> TableDescriptor.forConnector("datagen").option("rows-per-second", "10")
>   .option("number-of-rows", "10")
>   .schema(Schema
>     .newBuilder()
>     .column("account_num", DataTypes.VARCHAR(2147483647))
>     .column("acc_name", DataTypes.VARCHAR(2147483647))
>     .column("acc_phone_num", DataTypes.VARCHAR(2147483647))
>     .build())
>   .build()
> val accountsTable = tableEnv.from(accountsTd)
> tableEnv.createTemporaryView("accounts", accountsTable)
> val transactionsTd = 
> TableDescriptor.forConnector("datagen").option("rows-per-second", "10")
>   .option("number-of-rows", "10")
>   .schema(Schema
>     .newBuilder()
>     .column("account_num", DataTypes.VARCHAR(2147483647))
>     .column("transaction_place", DataTypes.VARCHAR(2147483647))
>     .column("transaction_time", DataTypes.BIGINT())
>     .column("amount", DataTypes.INT())
>     .build())
>   .build()
> val transactionsTable = tableEnv.from(transactionsTd)
> tableEnv.createTemporaryView("transaction_data", transactionsTable)
> val newTable = tableEnv.sqlQuery("select   acc.account_num,  (select count(*) 
> from transaction_data where transaction_place = trans.transaction_place and 
> account_num = acc.account_num)  from  accounts acc,transaction_data trans")
> tableEnv.toChangelogStream(newTable).print()
> env.execute() {code}
> I get the following error if I run the above code.
>  
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> unexpected correlate variable $cor0 in the plan
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:59)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>     at scala.collection.immutable.Range.foreach(Range.scala:158)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>     at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
>     at 
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
>     at 
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
>     at 
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.scala:160)
>     at org.example.WhereClauseBug$.main(WhereClauseBug.scala:50)
>     at org.example.WhereClauseBug.main(WhereClauseBug.scala)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to