twalthr commented on code in PR #26840:
URL: https://github.com/apache/flink/pull/26840#discussion_r2239784927


##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml:
##########
@@ -352,50 +352,50 @@ Calc(select=[c, w0$o0 AS $1])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRowTimeBoundedPartitionedRangeOver">
+  <TestCase name="testRowTimeBoundedNonPartitionedRowsOver">
     <Resource name="sql">
       <![CDATA[
 SELECT c,
-    COUNT(a) OVER (PARTITION BY c ORDER BY rowtime
-        RANGE BETWEEN INTERVAL '1' SECOND  PRECEDING AND CURRENT ROW)
-    FROM MyTable
+    COUNT(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 PRECEDING AND CURRENT ROW)
+FROM MyTable
       ]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(c=[$2], EXPR$1=[COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 
NULLS FIRST RANGE 1000:INTERVAL SECOND PRECEDING)])
+LogicalProject(c=[$2], EXPR$1=[COUNT($0) OVER (ORDER BY $4 NULLS FIRST ROWS 5 
PRECEDING)])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, w0$o0 AS $1])
-+- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 
1000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0])
-   +- Exchange(distribution=[hash[c]])
++- OverAggregate(orderBy=[rowtime ASC], window=[ ROWS BETWEEN 5 PRECEDING AND 
CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0])
+   +- Exchange(distribution=[single])
       +- Calc(select=[a, c, rowtime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRowTimeBoundedNonPartitionedRowsOver">
+  <TestCase name="testRowTimeBoundedPartitionedRangeOver">
     <Resource name="sql">
       <![CDATA[
 SELECT c,
-    COUNT(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 PRECEDING AND CURRENT ROW)
-FROM MyTable
+    COUNT(a) OVER (PARTITION BY c ORDER BY rowtime
+        RANGE BETWEEN INTERVAL '1' SECOND  PRECEDING AND CURRENT ROW)
+    FROM MyTable
       ]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(c=[$2], EXPR$1=[COUNT($0) OVER (ORDER BY $4 NULLS FIRST ROWS 5 
PRECEDING)])
+LogicalProject(c=[$2], EXPR$1=[COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 
NULLS FIRST RANGE 1000:INTERVAL SECOND PRECEDING)])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, w0$o0 AS $1])
-+- OverAggregate(orderBy=[rowtime ASC], window=[ ROWS BETWEEN 5 PRECEDING AND 
CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0])
-   +- Exchange(distribution=[single])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 
1000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0])

Review Comment:
   ```suggestion
   +- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG 
BETWEEN 1000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS 
w0$o0])
   ```
   How is a typo `RANG` possible here?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala:
##########
@@ -480,4 +482,31 @@ class OverAggregateTest extends TableTestBase {
     util.verifyExecPlan(sql)
   }
 
+  @Test
+  def testWindowBoundaryNotNumeric(): Unit = {
+    val sql =
+      """
+        |SELECT c,
+        |    COUNT(a) OVER (PARTITION BY b ORDER BY proctime
+        |        ROWS BETWEEN '2' PRECEDING AND CURRENT ROW) AS cnt1
+        |FROM MyTable
+      """.stripMargin
+
+    assertThatThrownBy(() => util.verifyExecPlan(sql))
+      .hasRootCauseInstanceOf(classOf[ValidationException])

Review Comment:
   Please also match against a root cause. Not just the pure type which could 
be anything.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala:
##########
@@ -592,11 +594,25 @@ object RelExplainUtil {
       window.getRowType.getFieldCount - window.groups.flatMap(_.aggCalls).size
     }
 
+    def calcBoundOffset(bound: RexWindowBound, window: Window): AnyRef = {
+      val ref = bound.getOffset.asInstanceOf[RexInputRef]
+      val boundIndex = ref.getIndex - calcOriginInputRows(window)
+      if (window.constants.isEmpty || boundIndex >= window.constants.size) {
+        throw new ValidationException("Expressions for window boundary are not 
allowed")

Review Comment:
   It looks like the RexSimplify kicks in, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to