[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2023-03-14 Thread Krzysztof Chmielewski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17700264#comment-17700264
 ] 

Krzysztof Chmielewski commented on FLINK-26051:
---

hi [~qingyue]
I wonder, isn't the issue that for this query here the compCnt (CPU cost) is 
wrongly calculated as zero?
In other words issue is that compCnt is wrongly calculated as zero for this 
query/rule or problem is that zero is wrongly handled later in the code?

With compCnt + 1 it seems that you add a bias value to every computation cost 
value, so Im not surprised that so many plans have changed and tests are 
failing. 

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Assignee: Jane Chan
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png, image-2022-06-17-21-28-54-886.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2023-03-13 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699929#comment-17699929
 ] 

Jane Chan commented on FLINK-26051:
---

[~KristoffSC], Sorry for the late reply. Feel free to go ahead.

Here are some investigations I made before; I hope that will help.

By changing the CommonCalc#computeSelfCost to 
{code:scala}
planner.getCostFactory.makeCost(newRowCnt, newRowCnt * (compCnt + 1), 0) {code}
This issue can be fixed.

Nonetheless, it will cause ~100 test plans to be changed. Over 50% of plans 
removed the Calc node that only contains projection. A few plans failed after 
applying this change. 

I think the leading blocker is evaluating the impact of this change to ensure 
there will be no performance regression for the affected tests.  

 

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Assignee: Jane Chan
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png, image-2022-06-17-21-28-54-886.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2023-03-13 Thread Krzysztof Chmielewski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699685#comment-17699685
 ] 

Krzysztof Chmielewski commented on FLINK-26051:
---

Hi [~qingyue]
I would like to help with solving this issue.

could you tell me where are we regarding this one?
Reading the comments I'm not sure if you have some other branch/solution that 
was proposed by [~zhangbinzaifendou] or do you have something new?

I would like to work on this one but I would need some help with starting. 
I read your comment about `CommonCalc cannot produce right CPU cost for 
computations.`. I see that switch conditions there are really straight forward. 
Do you have an idea what might be missing there?



> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Assignee: Jane Chan
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png, image-2022-06-17-21-28-54-886.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-11-28 Thread Krzysztof Chmielewski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17640037#comment-17640037
 ] 

Krzysztof Chmielewski commented on FLINK-26051:
---

Hi :) [~qingyue]
Do you have any update on this one? :)

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Assignee: Jane Chan
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png, image-2022-06-17-21-28-54-886.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
> In the stacktrace above  , rownumber() 's  physical rel which  is 
> StreamExecRank In nomal change to StreamExecOverAggregate . The 
> StreamExecOverAggregate rel has a  window= ROWS 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-08-22 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17583305#comment-17583305
 ] 

Jane Chan commented on FLINK-26051:
---

Hi [~maver1ck], sorry for keeping you waiting. Change the cost model makes a 
huge impact on plan optimization, and there are some failure cases due to this 
change. I'm still working on it, will update asap.

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Assignee: Jane Chan
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png, image-2022-06-17-21-28-54-886.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
> In the stacktrace above  , rownumber() 's  physical rel 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-08-03 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17574543#comment-17574543
 ] 

Maciej Bryński commented on FLINK-26051:


[~qingyue] Waiting for it :)

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Assignee: Jane Chan
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png, image-2022-06-17-21-28-54-886.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
> In the stacktrace above  , rownumber() 's  physical rel which  is 
> StreamExecRank In nomal change to StreamExecOverAggregate . The 
> StreamExecOverAggregate rel has a  window= ROWS BETWEEN UNBOUNDED PRECEDING 
> AND CURRENT 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-07-14 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1759#comment-1759
 ] 

Jane Chan commented on FLINK-26051:
---

Sorry for being late. I will open PR asap.

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Assignee: Jane Chan
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png, image-2022-06-17-21-28-54-886.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
> In the stacktrace above  , rownumber() 's  physical rel which  is 
> StreamExecRank In nomal change to StreamExecOverAggregate . The 
> StreamExecOverAggregate rel has a  window= ROWS BETWEEN UNBOUNDED PRECEDING 
> AND 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-06-29 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17560786#comment-17560786
 ] 

godfrey he commented on FLINK-26051:


[~qingyue] Thanks for the investigation, assign to you

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Assignee: Jane Chan
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png, image-2022-06-17-21-28-54-886.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
> In the stacktrace above  , rownumber() 's  physical rel which  is 
> StreamExecRank In nomal change to StreamExecOverAggregate . The 
> StreamExecOverAggregate rel has a  window= ROWS BETWEEN UNBOUNDED 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-06-29 Thread Jane Chan (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 Jane Chan commented on  FLINK-26051  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
  Re: one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode   
 

  
 
 
 
 

 
 Sorry for the late reply. The reason why FlinkLogicalRankRuleForRangeEnd is not matched is due to the null condition(See FlinkLogicalRankRuleForRangeEnd#L177), which caused by the following two nested calc node. 

 

FlinkLogicalSink(table=[default_catalog.default_database.oprms_order_base_order_item_one_group], fields=[order_id, item_no, parent_type, fed_sku_total_price, foodbox_total_price, update_time])
+- FlinkLogicalCalc(select=[order_id, item_no, parent_type, fed_sku_total_price, foodbox_total_price, CAST(LOCALTIMESTAMP() AS TIMESTAMP(6)) AS update_time])
   +- FlinkLogicalAggregate(group=[{0, 1, 2}], fed_sku_total_price=[SUM($3)], foodbox_total_price=[SUM($4)])
      +- FlinkLogicalCalc(select=[order_id, item_no_p2 AS item_no, parent_type_p2 AS parent_type, IF(=(parent_type_p1, 2), *(IF(=(is_combo, 1), spu_count, 1), original_total_price), 0) AS $f3, IF(=(parent_type_p1, 5), *(IF(=(is_combo, 1), spu_count, 1), original_total_price), 0) AS $f4], where=[AND(=(w0$o0, 1:BIGINT), ob_retract_flag, ot1_retract_flag, ot2_retract_flag)])
         +- FlinkLogicalCalc(select=[order_id, item_no_p2, parent_type_p2, is_combo, spu_count, original_total_price, parent_type_p1, ob_retract_flag, ot1_retract_flag, ot2_retract_flag, w0$o0])
            +- FlinkLogicalOverAggregate(window#0=[window(partition {0, 7} order by [12 DESC-nulls-last, 14 DESC-nulls-last, 15 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
               +- FlinkLogicalCalc(select=[order_id, item_no_p2, parent_type_p2, is_combo, spu_count, original_total_price, parent_type_p1, item_no_p1, update_time, ob_retract_flag, ot1_retract_flag, ot2_retract_flag, order_version, retract_flag, emit_time, emit_order], where=[retract_flag])
                  +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, oprms_order_base_order_item_one]], fields=[order_id, item_no_p2, parent_type_p2, is_combo, spu_count, original_total_price, parent_type_p1, item_no_p1, update_time, ob_retract_flag, ot1_retract_flag, ot2_retract_flag, order_version, retract_flag, emit_time, emit_order])  

 The calc nodes are not merged because CommonCalc cannot produce right CPU cost for computations. cc godfrey he   
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-06-27 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17559102#comment-17559102
 ] 

godfrey he commented on FLINK-26051:


[~qingyue] do you have any update?

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png, image-2022-06-17-21-28-54-886.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
> In the stacktrace above  , rownumber() 's  physical rel which  is 
> StreamExecRank In nomal change to StreamExecOverAggregate . The 
> StreamExecOverAggregate rel has a  window= ROWS BETWEEN UNBOUNDED PRECEDING 
> AND CURRENT ROW which i never add .Oddly,if you 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-06-20 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556603#comment-17556603
 ] 

godfrey he commented on FLINK-26051:


[~qingyue] Thanks for driving this

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png, image-2022-06-17-21-28-54-886.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
> In the stacktrace above  , rownumber() 's  physical rel which  is 
> StreamExecRank In nomal change to StreamExecOverAggregate . The 
> StreamExecOverAggregate rel has a  window= ROWS BETWEEN UNBOUNDED PRECEDING 
> AND CURRENT ROW which i never add .Oddly,if you 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-06-17 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17555610#comment-17555610
 ] 

Jane Chan commented on FLINK-26051:
---

Sorry for joining this discussion late. I reproduced the example provided by 
[~zhangbinzaifendou] on the master branch. Interestingly, adding two extra 
filters
ot1_retract_flag=true and ot2_retract_flag=true
broke the plan. By diff the log, I found that the 
FlinkLogicalRankRuleForRangeEnd is not matched due to the null condition.

So I guess the previous rewrite might have some bug. I'll keep updating once 
found further proof.

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png, image-2022-06-17-21-28-54-886.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-06-16 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17555383#comment-17555383
 ] 

godfrey he commented on FLINK-26051:


[~zhangbinzaifendou] Thanks for push the pr

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
> In the stacktrace above  , rownumber() 's  physical rel which  is 
> StreamExecRank In nomal change to StreamExecOverAggregate . The 
> StreamExecOverAggregate rel has a  window= ROWS BETWEEN UNBOUNDED PRECEDING 
> AND CURRENT ROW which i never add .Oddly,if you remove the "case when" 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-06-15 Thread zhangbin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554616#comment-17554616
 ] 

zhangbin commented on FLINK-26051:
--

Another SQL example with the same error


{code:java}
@Test
public void testJoinUniqueKey4() throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
StatementSet statementSet = tableEnv.createStatementSet();
tableEnv.executeSql("CREATE TABLE oprms_order_base_order_item_one 
(order_id BIGINT, item_no_p2 VARCHAR, parent_type_p2 INTEGER, is_combo INTEGER, 
spu_count INTEGER, original_total_price BIGINT, parent_type_p1 INTEGER, 
item_no_p1 VARCHAR, update_time TIMESTAMP, ob_retract_flag BOOLEAN, 
ot1_retract_flag BOOLEAN, ot2_retract_flag BOOLEAN, order_version INTEGER, 
retract_flag BOOLEAN, emit_time BIGINT, emit_order INTEGER ) WITH 
('connector'='datagen')");
tableEnv.executeSql("CREATE TABLE oprms_order_base_order_item_one_group 
(order_id BIGINT, item_no VARCHAR, parent_type INTEGER, fed_sku_total_price 
BIGINT, foodbox_total_price BIGINT, update_time TIMESTAMP ) WITH 
('connector'='blackhole')");
tableEnv.executeSql("create view oprms_order_base_order_item_one_view 
as "
+ "select * from ("
+ " select * ,row_number() over( partition by 
order_id,item_no_p1 order by order_version desc, emit_time desc, 
emit_order desc) rk "
+ " from oprms_order_base_order_item_one"
+ " where retract_flag=true "
+ ") a "
+ " where rk=1 and ob_retract_flag=true ");
//+ " and ot1_retract_flag=true and ot2_retract_flag=true");
statementSet.addInsertSql("insert into 
oprms_order_base_order_item_one_group\n"
+ "select a.order_id ,a.item_no_p2 as item_no ,a.parent_type_p2 
as parent_type ,"
+ "sum(if(a.parent_type_p1 = 2, if(a.is_combo=1, a.spu_count, 
1)*a.original_total_price, 0)) as fed_sku_total_price ,"
+ "sum(if(a.parent_type_p1 = 5, if(a.is_combo=1, a.spu_count, 
1)*a.original_total_price, 0)) as foodbox_total_price ,"
+ "localtimestamp AS update_time "
+ "from oprms_order_base_order_item_one_view a "
+ "group by a.order_id ,a.item_no_p2 ,a.parent_type_p2");

System.out.println(statementSet.explain(ExplainDetail.CHANGELOG_MODE));
}
{code}


The same error occurs when this condition is added
//+ " and ot1_retract_flag=true and ot2_retract_flag=true");



> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-06-15 Thread zhangbin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554606#comment-17554606
 ] 

zhangbin commented on FLINK-26051:
--

[https://github.com/apache/flink/pull/19969]

cc [~godfreyhe] 

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
> In the stacktrace above  , rownumber() 's  physical rel which  is 
> StreamExecRank In nomal change to StreamExecOverAggregate . The 
> StreamExecOverAggregate rel has a  window= ROWS BETWEEN UNBOUNDED PRECEDING 
> AND CURRENT ROW which i never add .Oddly,if you remove the 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-06-13 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17553587#comment-17553587
 ] 

Jing Ge commented on FLINK-26051:
-

[~maver1ck] would you like to share your thoughts about your concern with the 
current solution and what is your requirement and the expected solution? Thanks.

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
> In the stacktrace above  , rownumber() 's  physical rel which  is 
> StreamExecRank In nomal change to StreamExecOverAggregate . The 
> StreamExecOverAggregate rel has a  window= 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-05-19 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539453#comment-17539453
 ] 

Maciej Bryński commented on FLINK-26051:


[~xuyangzhong]
Do we have any better fix than changing order of Flink rules ?

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2
>Reporter: chuncheng wu
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
> In the stacktrace above  , rownumber() 's  physical rel which  is 
> StreamExecRank In nomal change to StreamExecOverAggregate . The 
> StreamExecOverAggregate rel has a  window= ROWS BETWEEN UNBOUNDED PRECEDING 
> AND CURRENT ROW which i never add 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-03-17 Thread chuncheng wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508089#comment-17508089
 ] 

chuncheng wu commented on FLINK-26051:
--

there is a simple demo  has similar bug:,core sql is:

Table task_latest = tableEnv.sqlQuery("SELECT\n" +
"task_id,\n" +
"dim_id,\n" +
"update_time,\n" +
"update_order,\n" +
"rn\n" +
"from\n"+
"(\n"+
"select *,row_number() over(partition by task_id order by update_time 
desc,update_order desc) as rn\n" +
"FROM task\n" +
")temp \n" );

full program is:
{code:java}
package com.meituan.grocery.data.flink.test;

import 
com.meituan.grocery.data.flink.verify.codeExchange.udf.RichPoiV2WithCache;
import com.meituan.grocery.data.flink.verify.codeExchange.udf.pojo.PoiVersion2;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;

public class DebugRnRetractWcc {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000 * 60 * 5);
env.getCheckpointConfig().setCheckpointTimeout(1000 * 60 * 10);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
bsSettings);
TableConfig config = tableEnv.getConfig();
config.setIdleStateRetentionTime(Time.hours(96), Time.hours(120));

tableEnv.createTemporaryView("task", tableEnv.fromTableSource(new Task()));
tableEnv.createTemporaryView("task_detail", tableEnv.fromTableSource(new 
TaskDetail()));
// tableEnv.registerFunction("richPoi", new RichPoiV2WithCache());

Table task_latest = tableEnv.sqlQuery("SELECT\n" +
"task_id,\n" +
"dim_id,\n" +
"update_time,\n" +
"update_order,\n" +
"rn\n" +
"from\n"+
"(\n"+
"select *,row_number() over(partition by task_id order by update_time 
desc,update_order desc) as rn\n" +
"FROM task\n" +
")temp \n" );

DataStream> stream = tableEnv.toRetractStream(task_latest, 
Row.class);
stream.print();
StreamGraph sg = env.getStreamGraph("test", false);
JobGraph jg = sg.getJobGraph();
// System.out.println(env.getExecutionPlan());

env.execute("Debug");
}

public static class Task implements StreamTableSource {

private int totalCol = 4;
private String[] fieldNames = new String[]{"task_id", 
"dim_id","update_time","update_order"};
public DataType[] fieldDataTypes = new DataType[]{DataTypes.INT(), 
DataTypes.INT(), DataTypes.BIGINT(),DataTypes.INT()};
public DataType[] fieldDataTypes3 = new DataType[]{DataTypes.STRING()};
public DataType[] fieldDataTypes2 = new DataType[]{DataTypes.INT(), 
DataTypes.INT(), DataTypes.TIMESTAMP(), 
TypeConversions.fromLegacyInfoToDataType(TypeInformation.of(PoiVersion2.class))};
private TypeInformation[] typeInformations = new TypeInformation[]{
TypeConversions.fromDataTypeToLegacyInfo(DataTypes.INT()),
TypeConversions.fromDataTypeToLegacyInfo(DataTypes.INT()),
TypeConversions.fromDataTypeToLegacyInfo(DataTypes.BIGINT()),
TypeConversions.fromDataTypeToLegacyInfo(DataTypes.INT())
};

@Override
public DataStream getDataStream(StreamExecutionEnvironment env) {
return env.addSource(new SourceFunction() {
@Override
public void run(SourceContext ctx) throws Exception {
ctx.collect(Row.of(20210624, 10,1645367017355L,9));
ctx.collect(Row.of(20210624, 11,1645367017355L,10));
ctx.collect(Row.of(20210624, 12,1645367017311L,8));
ctx.collect(Row.of(20210625, 13,1645367017211L,7));
}

@Override
public void cancel() {

}
}).returns(new RowTypeInfo(typeInformations, fieldNames));
}

@Override
public DataType getProducedDataType() {
DataTypes.Field[] fields = new DataTypes.Field[totalCol];
for (int i = 0; i < totalCol; i++) {
fields[i] = DataTypes.FIELD(fieldNames[i], fieldDataTypes[i]);
}
return DataTypes.ROW(fields);
}


@Override
public TableSchema 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-02-28 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17499312#comment-17499312
 ] 

xuyang commented on FLINK-26051:


Hi, thanks to report this bug! Would you like to fix this bug? 
[~zhangbinzaifendou] 

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2
>Reporter: chuncheng wu
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
> In the stacktrace above  , rownumber() 's  physical rel which  is 
> StreamExecRank In nomal change to StreamExecOverAggregate . The 
> StreamExecOverAggregate rel has a  window= ROWS BETWEEN UNBOUNDED PRECEDING 
> AND CURRENT ROW which i never add .Oddly,if you 

[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2022-02-10 Thread zhangbinzaifendou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17490165#comment-17490165
 ] 

zhangbinzaifendou commented on FLINK-26051:
---

Using the Flink1.12.2 test on the idea, the debug log found that the 
logicalPlan before executing the FlinkLogicalRankRuleForRangeEnd rule is 
different

before apply FlinkLogicalRankRuleForRangeEnd
-- with case when
FlinkLogicalCalc(select=[biz_bill_no, CASE(OR(=(task_mode, 51), 
AND(=(task_mode, 40), >=(total_stage_num, 2), >=(current_stage_index, 2), 
=(use_pre_task_owner, 1))), parent_task_no, sowing_task_no) AS 
parent_task_no_cw, parent_task_no, sowing_task_no, task_type, task_mode, 
total_stage_num, current_stage_index, use_pre_task_owner], 
where=[AND(SEARCH(w0$o0, Sarg[1L:BIGINT]:BIGINT), SEARCH(task_type, Sarg[21]), 
SEARCH(task_mode, Sarg[40, 51]), SEARCH(poi_type, Sarg[2]), 
SEARCH(biz_origin_bill_type, Sarg[(-∞..111), (111..112), (112..113), 
(113..114), (114..+∞)]))])
+- FlinkLogicalCalc(select=[biz_bill_no, task_type, task_mode, parent_task_no, 
total_stage_num, current_stage_index, use_pre_task_owner, poi_type, 
biz_origin_bill_type, sowing_task_no, w0$o0])
   +- FlinkLogicalOverAggregate(window#0=[window(partition \{10, 11} order by 
[1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs 
[ROW_NUMBER()])])
      +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, 
default_database, wosOutSowingTaskDetail]])

not match FlinkLogicalRankRuleForRangeEnd, But it matches the 
FlinkCalcMergeRule after FlinkLogicalRankRuleForRangeEnd

optimize result: 
FlinkLogicalCalc(select=[biz_bill_no, CASE(OR(=(task_mode, 51), 
AND(=(task_mode, 40), >=(total_stage_num, 2), >=(current_stage_index, 2), 
=(use_pre_task_owner, 1))), parent_task_no, sowing_task_no) AS 
parent_task_no_cw, parent_task_no, sowing_task_no, task_type, task_mode, 
total_stage_num, current_stage_index, use_pre_task_owner], 
where=[AND(SEARCH(w0$o0, Sarg[1L:BIGINT]:BIGINT), SEARCH(task_type, Sarg[21]), 
SEARCH(task_mode, Sarg[40, 51]), SEARCH(poi_type, Sarg[2]), 
SEARCH(biz_origin_bill_type, Sarg[(-∞..111), (111..112), (112..113), 
(113..114), (114..+∞)]))])
+- {color:#FF}FlinkLogicalOverAggregate{color}(window#0=[window(partition 
\{10, 11} order by [1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and 
CURRENT ROW aggs [ROW_NUMBER()])])
   +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, 
default_database, wosOutSowingTaskDetail]])

 

-- without case when
FlinkLogicalCalc(select=[biz_bill_no, parent_task_no, sowing_task_no, 
task_type, task_mode, total_stage_num, current_stage_index, 
use_pre_task_owner], where=[AND(SEARCH(w0$o0, Sarg[1L:BIGINT]:BIGINT), 
SEARCH(task_type, Sarg[21]), SEARCH(task_mode, Sarg[40, 51]), SEARCH(poi_type, 
Sarg[2]), SEARCH(biz_origin_bill_type, Sarg[(-∞..111), (111..112), (112..113), 
(113..114), (114..+∞)]))])
+- FlinkLogicalOverAggregate(window#0=[window(partition \{10, 11} order by [1 
DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs 
[ROW_NUMBER()])])
   +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, 
default_database, wosOutSowingTaskDetail]])

match FlinkLogicalRankRuleForRangeEnd

optimize result: 
FlinkLogicalCalc(select=[biz_bill_no, parent_task_no, sowing_task_no, 
task_type, task_mode, total_stage_num, current_stage_index, 
use_pre_task_owner], where=[AND(AND(AND(AND(AND(AND(=(task_type, 21), 
OR(=(task_mode, 40), =(task_mode, 51))), =(poi_type, 2)), 
<>(biz_origin_bill_type, 111)), <>(biz_origin_bill_type, 112)), 
<>(biz_origin_bill_type, 113)), <>(biz_origin_bill_type, 114))])
+- {color:#FF}FlinkLogicalRank{color}(rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt,sowing_task_detail_id], 
orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, 
parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id])
   +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, 
default_database, wosOutSowingTaskDetail]])


SQL with case when is converted to FlinkLogicalOverAggregate, SQL without case 
when is converted to FlinkLogicalRank

The problem was solved when I increased FlinkCalcMergeRule.INSTANCE before 
FlinkLogicalRankRule.INSTANCE in LOGICAL_REWRITE ruleset of 
FlinkStreamRuleSets.java

!image-2022-02-10-20-13-14-424.png|width=645,height=384!

[~godfreyhe] [~jark] 

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug