This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c2d9e69 [FLINK-21733][table-planner-blink] WatermarkAssigner incorrectly recomputing the rowtime index which may cause ArrayIndexOutOfBoundsException (#15150) c2d9e69 is described below commit c2d9e69450e0a1bf14aa866ecdd8f5c38072923d Author: lincoln lee <lincoln.8...@gmail.com> AuthorDate: Fri Mar 12 15:20:22 2021 +0800 [FLINK-21733][table-planner-blink] WatermarkAssigner incorrectly recomputing the rowtime index which may cause ArrayIndexOutOfBoundsException (#15150) --- .../plan/nodes/calcite/WatermarkAssigner.scala | 6 +----- .../plan/stream/sql/SourceWatermarkTest.xml | 18 +++++++++++++++++ .../plan/stream/sql/SourceWatermarkTest.scala | 23 ++++++++++++++++++++++ 3 files changed, 42 insertions(+), 5 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala index 99756bf..459981a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala @@ -65,11 +65,7 @@ abstract class WatermarkAssigner( } override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - val rowtimeFieldName = inputRel.getRowType.getFieldNames.get(rowtimeFieldIndex) - val newInputRel = inputs.get(0) - // the input fields maybe reordered, re-computed the rowtime index - val newIndex = newInputRel.getRowType.getFieldNames.indexOf(rowtimeFieldName) - copy(traitSet, newInputRel, newIndex, watermarkExpr) + copy(traitSet, inputs.get(0), rowtimeFieldIndex, watermarkExpr) } /** diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml index d2d5632..93b8d8b 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml @@ -130,4 +130,22 @@ Calc(select=[a, b], where=[(b > 10)]) ]]> </Resource> </TestCase> + <TestCase name="testProjectTransposeWatermarkAssigner"> + <Resource name="sql"> + <![CDATA[SELECT a, b, ts FROM t1]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], ts=[$5]) ++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($5, 10000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], t=[$4], ts=[$4]) + +- LogicalTableScan(table=[[default_catalog, default_database, t1]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +TableSourceScan(table=[[default_catalog, default_database, t1, project=[a, b, t], watermark=[-($2, 10000:INTERVAL SECOND)]]], fields=[a, b, t]) +]]> + </Resource> + </TestCase> </Root> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala index 54d657d..cc0c24c 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala @@ -137,4 +137,27 @@ class SourceWatermarkTest extends TableTestBase { def testWatermarkWithMetadata(): Unit = { util.verifyExecPlan("SELECT a, b FROM MyTable") } + + @Test + def testProjectTransposeWatermarkAssigner(): Unit = { + val sourceDDL = + s""" + |CREATE TEMPORARY TABLE `t1` ( + | `a` VARCHAR, + | `b` VARCHAR, + | `c` VARCHAR, + | `d` INT, + | `t` TIMESTAMP(3), + | `ts` AS `t`, + | WATERMARK FOR `ts` AS `ts` - INTERVAL '10' SECOND + |) WITH ( + | 'connector' = 'values', + | 'enable-watermark-push-down' = 'true', + | 'bounded' = 'false', + | 'disable-lookup' = 'true' + |) + """.stripMargin + util.tableEnv.executeSql(sourceDDL) + util.verifyExecPlan("SELECT a, b, ts FROM t1") + } }