This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new 1088b87 [FLINK-22523][table-planner-blink] Window TVF should throw helpful exception when specifying offset parameter (#15803) 1088b87 is described below commit 1088b8726732d5121a40a88f38e2fe0bcefffb37 Author: Jark Wu <j...@apache.org> AuthorDate: Tue May 11 10:19:58 2021 +0800 [FLINK-22523][table-planner-blink] Window TVF should throw helpful exception when specifying offset parameter (#15803) --- .../functions/sql/SqlCumulateTableFunction.java | 5 +- .../planner/functions/sql/SqlHopTableFunction.java | 5 +- .../functions/sql/SqlTumbleTableFunction.java | 7 +-- .../plan/stream/sql/WindowTableFunctionTest.scala | 55 ++++++++++++++++++++++ 4 files changed, 67 insertions(+), 5 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java index f5e6c56..82eb710 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java @@ -57,6 +57,9 @@ public class SqlCumulateTableFunction extends SqlWindowTableFunction { if (!checkIntervalOperands(callBinding, 2)) { return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); } + if (callBinding.getOperandCount() == 5) { + return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + } // check time attribute return throwExceptionOrReturnFalse( checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure); @@ -66,7 +69,7 @@ public class SqlCumulateTableFunction extends SqlWindowTableFunction { public String getAllowedSignatures(SqlOperator op, String opName) { return opName + "(TABLE table_name, DESCRIPTOR(timecol), " - + "datetime interval, datetime interval[, datetime interval])"; + + "datetime interval, datetime interval)"; } } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java index 048a963..1d2a34f 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java @@ -57,6 +57,9 @@ public class SqlHopTableFunction extends SqlWindowTableFunction { if (!checkIntervalOperands(callBinding, 2)) { return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); } + if (callBinding.getOperandCount() == 5) { + return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + } // check time attribute return throwExceptionOrReturnFalse( checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure); @@ -66,7 +69,7 @@ public class SqlHopTableFunction extends SqlWindowTableFunction { public String getAllowedSignatures(SqlOperator op, String opName) { return opName + "(TABLE table_name, DESCRIPTOR(timecol), " - + "datetime interval, datetime interval[, datetime interval])"; + + "datetime interval, datetime interval)"; } } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java index b0f3cb4..c3149fd 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java @@ -55,6 +55,9 @@ public class SqlTumbleTableFunction extends SqlWindowTableFunction { if (!checkIntervalOperands(callBinding, 2)) { return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); } + if (callBinding.getOperandCount() == 4) { + return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + } // check time attribute return throwExceptionOrReturnFalse( checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure); @@ -62,9 +65,7 @@ public class SqlTumbleTableFunction extends SqlWindowTableFunction { @Override public String getAllowedSignatures(SqlOperator op, String opName) { - return opName - + "(TABLE table_name, DESCRIPTOR(timecol), datetime interval" - + "[, datetime interval])"; + return opName + "(TABLE table_name, DESCRIPTOR(timecol), datetime interval)"; } } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala index d3f93c8..640fade 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala @@ -140,4 +140,59 @@ class WindowTableFunctionTest extends TableTestBase { util.verifyExplain(sql) } + @Test + def testInvalidTumbleParameters(): Unit = { + val sql = + """ + |SELECT * + |FROM TABLE(TUMBLE( + | TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE, INTERVAL '5' MINUTE)) + |""".stripMargin + + thrown.expectMessage("Supported form(s): " + + "TUMBLE(TABLE table_name, DESCRIPTOR(timecol), datetime interval)") + thrown.expect(classOf[ValidationException]) + util.verifyExplain(sql) + } + + @Test + def testInvalidHopParameters(): Unit = { + val sql = + """ + |SELECT * + |FROM TABLE( + | HOP( + | TABLE MyTable, + | DESCRIPTOR(rowtime), + | INTERVAL '1' MINUTE, + | INTERVAL '15' MINUTE, + | INTERVAL '5' MINUTE)) + |""".stripMargin + + thrown.expectMessage("Supported form(s): " + + "HOP(TABLE table_name, DESCRIPTOR(timecol), datetime interval, datetime interval)") + thrown.expect(classOf[ValidationException]) + util.verifyExplain(sql) + } + + @Test + def testInvalidCumulateParameters(): Unit = { + val sql = + """ + |SELECT * + |FROM TABLE( + | CUMULATE( + | TABLE MyTable, + | DESCRIPTOR(rowtime), + | INTERVAL '1' MINUTE, + | INTERVAL '15' MINUTE, + | INTERVAL '5' MINUTE)) + |""".stripMargin + + thrown.expectMessage("Supported form(s): " + + "CUMULATE(TABLE table_name, DESCRIPTOR(timecol), datetime interval, datetime interval)") + thrown.expect(classOf[ValidationException]) + util.verifyExplain(sql) + } + }