This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 35a2257af5d [FLINK-33248] Fix error message for CURRENT_WATERMARK without arguments 35a2257af5d is described below commit 35a2257af5d0cf9e671460b9770f1363b7a3d60c Author: bvarghese1 <bvargh...@confluent.io> AuthorDate: Wed Oct 11 15:20:30 2023 -0700 [FLINK-33248] Fix error message for CURRENT_WATERMARK without arguments - When CURRENT_WATERMARK is invoked without any arguments, it fails and returns an IndexOutOfBoundsException - This commit fixes it by handling for null and empty arguments thus returning a standard error message similar to other functions --- .../CurrentWatermarkInputTypeStrategy.java | 4 ++++ .../planner/runtime/stream/sql/CalcITCase.scala | 25 ++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java index 5d97eef250a..764f94339dc 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java @@ -54,6 +54,10 @@ class CurrentWatermarkInputTypeStrategy implements InputTypeStrategy { CallContext callContext, boolean throwOnFailure) { final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes(); + if (argumentDataTypes == null || argumentDataTypes.isEmpty()) { + return Optional.of(Collections.emptyList()); + } + final DataType dataType = argumentDataTypes.get(0); if (!LogicalTypeChecks.canBeTimeAttributeType(dataType.getLogicalType())) { return callContext.fail( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala index 2ee99119bd6..301496e37b6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala @@ -640,6 +640,31 @@ class CalcITCase extends StreamingTestBase { } } + @Test + def testCurrentWatermarkWithoutAnyAttribute(): Unit = { + val tableId = TestValuesTableFactory.registerData(Seq()) + tEnv.executeSql(s""" + |CREATE TABLE T ( + | ts TIMESTAMP_LTZ(3) + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$tableId', + | 'bounded' = 'true' + |) + """.stripMargin) + + try { + tEnv.sqlQuery("SELECT ts, CURRENT_WATERMARK() FROM T") + fail("CURRENT_WATERMARK without any attribute should have failed."); + } catch { + case e: Exception => + assertEquals( + "SQL validation failed. From line 1, column 12 to line 1, column 30: " + + "No match found for function signature CURRENT_WATERMARK()", + e.getMessage) + } + } + @Test def testCreateTemporaryTableFromDescriptor(): Unit = { val rows = Seq(row(42))