This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 35a2257af5d [FLINK-33248] Fix error message for CURRENT_WATERMARK 
without arguments
35a2257af5d is described below

commit 35a2257af5d0cf9e671460b9770f1363b7a3d60c
Author: bvarghese1 <bvargh...@confluent.io>
AuthorDate: Wed Oct 11 15:20:30 2023 -0700

    [FLINK-33248] Fix error message for CURRENT_WATERMARK without arguments
    
    - When CURRENT_WATERMARK is invoked without any arguments, it fails and 
returns
      an IndexOutOfBoundsException
    - This commit fixes it by handling for null and empty arguments thus 
returning
      a standard error message similar to other functions
---
 .../CurrentWatermarkInputTypeStrategy.java         |  4 ++++
 .../planner/runtime/stream/sql/CalcITCase.scala    | 25 ++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java
index 5d97eef250a..764f94339dc 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java
@@ -54,6 +54,10 @@ class CurrentWatermarkInputTypeStrategy implements 
InputTypeStrategy {
             CallContext callContext, boolean throwOnFailure) {
         final List<DataType> argumentDataTypes = 
callContext.getArgumentDataTypes();
 
+        if (argumentDataTypes == null || argumentDataTypes.isEmpty()) {
+            return Optional.of(Collections.emptyList());
+        }
+
         final DataType dataType = argumentDataTypes.get(0);
         if 
(!LogicalTypeChecks.canBeTimeAttributeType(dataType.getLogicalType())) {
             return callContext.fail(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index 2ee99119bd6..301496e37b6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -640,6 +640,31 @@ class CalcITCase extends StreamingTestBase {
     }
   }
 
+  @Test
+  def testCurrentWatermarkWithoutAnyAttribute(): Unit = {
+    val tableId = TestValuesTableFactory.registerData(Seq())
+    tEnv.executeSql(s"""
+                       |CREATE TABLE T (
+                       |  ts TIMESTAMP_LTZ(3)
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'data-id' = '$tableId',
+                       |  'bounded' = 'true'
+                       |)
+       """.stripMargin)
+
+    try {
+      tEnv.sqlQuery("SELECT ts, CURRENT_WATERMARK() FROM T")
+      fail("CURRENT_WATERMARK without any attribute should have failed.");
+    } catch {
+      case e: Exception =>
+        assertEquals(
+          "SQL validation failed. From line 1, column 12 to line 1, column 30: 
" +
+            "No match found for function signature CURRENT_WATERMARK()",
+          e.getMessage)
+    }
+  }
+
   @Test
   def testCreateTemporaryTableFromDescriptor(): Unit = {
     val rows = Seq(row(42))

Reply via email to