This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b37c98599b3 [FLINK-37610] Check if `ORDER BY` keys exists before
accessing
b37c98599b3 is described below
commit b37c98599b34aa62ee0da4edda6ba422f73766ed
Author: Bonnie Varghese <[email protected]>
AuthorDate: Thu Apr 10 00:53:32 2025 -0700
[FLINK-37610] Check if `ORDER BY` keys exists before accessing
- If `ORDER BY` clause is not specified in an Over agg, it would lead to
IndexOutOfBoundsException
- This commit adds a check to ensure `ORDER BY` fields exists before
accessing them
---
.../FlinkChangelogModeInferenceProgram.scala | 26 ++++++++++++----------
.../plan/stream/sql/agg/OverAggregateTest.xml | 23 +++++++++++++++++++
.../plan/stream/sql/agg/OverAggregateTest.scala | 13 +++++++++++
3 files changed, 50 insertions(+), 12 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index a569d5b64c2..8b128fccb71 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -309,19 +309,21 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
val builder = ModifyKindSet
.newBuilder()
.addContainedKind(ModifyKind.INSERT)
+ val groups = over.logicWindow.groups
- // All aggregates are computed over the same window and order by is
supported for only 1 field
- val orderKeyIndex =
-
over.logicWindow.groups.get(0).orderKeys.getFieldCollations.get(0).getFieldIndex
- val orderKeyType =
over.logicWindow.getRowType.getFieldList.get(orderKeyIndex).getType
- if (
- !FlinkTypeFactory.isRowtimeIndicatorType(orderKeyType)
- && !FlinkTypeFactory.isProctimeIndicatorType(orderKeyType)
- ) {
- // Only non row-time/proc-time sort can support UPDATES
- builder.addContainedKind(ModifyKind.UPDATE)
- builder.addContainedKind(ModifyKind.DELETE)
- overRequiredTrait = ModifyKindSetTrait.ALL_CHANGES
+ if (!groups.isEmpty &&
!groups.get(0).orderKeys.getFieldCollations.isEmpty) {
+ // All aggregates are computed over the same window and order by is
supported for only 1 field
+ val orderKeyIndex =
groups.get(0).orderKeys.getFieldCollations.get(0).getFieldIndex
+ val orderKeyType =
over.logicWindow.getRowType.getFieldList.get(orderKeyIndex).getType
+ if (
+ !FlinkTypeFactory.isRowtimeIndicatorType(orderKeyType)
+ && !FlinkTypeFactory.isProctimeIndicatorType(orderKeyType)
+ ) {
+ // Only non row-time/proc-time sort can support UPDATES
+ builder.addContainedKind(ModifyKind.UPDATE)
+ builder.addContainedKind(ModifyKind.DELETE)
+ overRequiredTrait = ModifyKindSetTrait.ALL_CHANGES
+ }
}
val children = visitChildren(over, overRequiredTrait)
val providedTrait = new ModifyKindSetTrait(builder.build())
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
index 72e3bffb228..d28e2818592 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
@@ -526,4 +526,27 @@ Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1,
null:INTEGER) AS cnt2])
]]>
</Resource>
</TestCase>
+ <TestCase name="testWithoutOrderByClause">
+ <Resource name="sql">
+ <![CDATA[
+SELECT c,
+ COUNT(a) OVER (PARTITION BY c) AS cnt1
+FROM MyTable
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (PARTITION BY $2)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[c, w0$o0 AS $1])
++- OverAggregate(partitionBy=[c], orderBy=[], window=[ RANG BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, c, proctime, rowtime,
COUNT(a) AS w0$o0])
+ +- Exchange(distribution=[hash[c]])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
index beb16551af9..86c9284481f 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
@@ -467,4 +467,17 @@ class OverAggregateTest extends TableTestBase {
|)
|""".stripMargin)
}
+
+ @Test
+ def testWithoutOrderByClause(): Unit = {
+ val sql =
+ """
+ |SELECT c,
+ | COUNT(a) OVER (PARTITION BY c) AS cnt1
+ |FROM MyTable
+ """.stripMargin
+
+ util.verifyExecPlan(sql)
+ }
+
}