[ 
https://issues.apache.org/jira/browse/FLINK-20887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17262211#comment-17262211
 ] 

Leonard Xu commented on FLINK-20887:
------------------------------------

Hi, [~TsReaper] 
 I think it's not related to the  FlinkCalcMergeRule because the plan will not 
be matched by the rule and it always fails no matter the  FlinkCalcMergeRule is 
added or not. 
And I think the fail reason is that  the RAND() function materialized when it 
is used rather than when it is declared.

> Non-deterministic functions return different values even if it is referred 
> with the same column name
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-20887
>                 URL: https://issues.apache.org/jira/browse/FLINK-20887
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.0
>            Reporter: Caizhi Weng
>            Priority: Major
>
> Add the following test case to {{CalcITCase.scala}}
> {code:scala}
> @Test
> def testRand(): Unit = {
>   checkResult(
>     s"""
>        |SELECT b - a FROM (
>        |  SELECT r + 5 AS a, r + 7 AS b FROM (
>        |    SELECT RAND() AS r FROM SmallTable3
>        |  ) t1
>        |) t2
>        |""".stripMargin,
>     Seq(row(2), row(2), row(2))
>   )
> }
> {code}
> Failure messages are
> {code}
> Results
>  == Correct Result - 3 ==   == Actual Result - 3 ==
> !2                          1.051329250417921
> !2                          1.3649146677814379
> !2                          1.787784536771345
>         
> Plan:
> == Abstract Syntax Tree ==
> LogicalProject(EXPR$0=[-($1, $0)])
> +- LogicalProject(a=[+($0, 5)], b=[+($0, 7)])
>    +- LogicalProject(r=[RAND()])
>       +- LogicalTableScan(table=[[default_catalog, default_database, 
> SmallTable3]])
> == Optimized Logical Plan ==
> Calc(select=[-(+(RAND(), 7), +(RAND(), 5)) AS EXPR$0])
> +- BoundedStreamScan(table=[[default_catalog, default_database, 
> SmallTable3]], fields=[a, b, c])
> {code}
> It seems that the projections are merged incorrectly. However if you run the 
> following test case in {{FlinkCalcMergeRuleTest.scala}}
> {code:scala}
> @Test
> def testCalcMergeWithRandomUdf(): Unit = {
>   val sqlQuery = "SELECT ts + a, ts + b FROM " +
>     "(SELECT a, b, random_udf(a) AS ts FROM MyTable WHERE a = b) t"
>   util.verifyRelPlan(sqlQuery)
> }
> {code}
> The result is
> {code:xml}
> <Root>
>   <TestCase name="testCalcMergeWithRandomUdf">
>     <Resource name="sql">
>       <![CDATA[SELECT ts + a, ts + b FROM (SELECT a, b, random_udf(a) AS ts 
> FROM MyTable WHERE a = b) t]]>
>     </Resource>
>     <Resource name="ast">
>       <![CDATA[
> LogicalProject(EXPR$0=[+(random_udf($0), $0)], EXPR$1=[+(random_udf($0), $1)])
> +- LogicalFilter(condition=[=($0, $1)])
>    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
> source: [TestTableSource(a, b, c)]]])
> ]]>
>     </Resource>
>     <Resource name="optimized rel plan">
>       <![CDATA[
> FlinkLogicalCalc(select=[+(random_udf(a), a) AS EXPR$0, +(random_udf(a), b) 
> AS EXPR$1], where=[=(a, b)])
> +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, 
> default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, 
> b, c])
> ]]>
>     </Resource>
>   </TestCase>
> </Root>
> {code}
> It seems that the plan is incorrect from the AST. So this seems to be a bug 
> in Calcite?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to