This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c2d9e69  [FLINK-21733][table-planner-blink] WatermarkAssigner 
incorrectly recomputing the rowtime index which may cause 
ArrayIndexOutOfBoundsException (#15150)
c2d9e69 is described below

commit c2d9e69450e0a1bf14aa866ecdd8f5c38072923d
Author: lincoln lee <lincoln.8...@gmail.com>
AuthorDate: Fri Mar 12 15:20:22 2021 +0800

    [FLINK-21733][table-planner-blink] WatermarkAssigner incorrectly 
recomputing the rowtime index which may cause ArrayIndexOutOfBoundsException 
(#15150)
---
 .../plan/nodes/calcite/WatermarkAssigner.scala     |  6 +-----
 .../plan/stream/sql/SourceWatermarkTest.xml        | 18 +++++++++++++++++
 .../plan/stream/sql/SourceWatermarkTest.scala      | 23 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 5 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
index 99756bf..459981a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
@@ -65,11 +65,7 @@ abstract class WatermarkAssigner(
   }
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
-    val rowtimeFieldName = 
inputRel.getRowType.getFieldNames.get(rowtimeFieldIndex)
-    val newInputRel = inputs.get(0)
-    // the input fields maybe reordered, re-computed the rowtime index
-    val newIndex = 
newInputRel.getRowType.getFieldNames.indexOf(rowtimeFieldName)
-    copy(traitSet, newInputRel, newIndex, watermarkExpr)
+    copy(traitSet, inputs.get(0), rowtimeFieldIndex, watermarkExpr)
   }
 
   /**
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
index d2d5632..93b8d8b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
@@ -130,4 +130,22 @@ Calc(select=[a, b], where=[(b > 10)])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testProjectTransposeWatermarkAssigner">
+    <Resource name="sql">
+      <![CDATA[SELECT a, b, ts FROM t1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], ts=[$5])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($5, 10000:INTERVAL 
SECOND)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], t=[$4], ts=[$4])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t1]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, t1, project=[a, b, 
t], watermark=[-($2, 10000:INTERVAL SECOND)]]], fields=[a, b, t])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala
index 54d657d..cc0c24c 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala
@@ -137,4 +137,27 @@ class SourceWatermarkTest extends TableTestBase {
   def testWatermarkWithMetadata(): Unit = {
     util.verifyExecPlan("SELECT a, b FROM MyTable")
   }
+
+  @Test
+  def testProjectTransposeWatermarkAssigner(): Unit = {
+    val sourceDDL =
+      s"""
+         |CREATE TEMPORARY TABLE `t1` (
+         |  `a`  VARCHAR,
+         |  `b`  VARCHAR,
+         |  `c`  VARCHAR,
+         |  `d`  INT,
+         |  `t`  TIMESTAMP(3),
+         |  `ts` AS `t`,
+         |  WATERMARK FOR `ts` AS `ts`  - INTERVAL '10' SECOND
+         |) WITH (
+         |  'connector' = 'values',
+         |  'enable-watermark-push-down' = 'true',
+         |  'bounded' = 'false',
+         |  'disable-lookup' = 'true'
+         |)
+       """.stripMargin
+    util.tableEnv.executeSql(sourceDDL)
+    util.verifyExecPlan("SELECT a, b, ts FROM t1")
+  }
 }

Reply via email to