This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b0aaf30d30 [FLINK-39278][table] Indexing of columns in `CREATE 
[MATERIALIZED ]TABLE` should start with 1 in error messages
9b0aaf30d30 is described below

commit 9b0aaf30d308ae58920aade8f6a789f6e134bc02
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Mar 20 21:01:36 2026 +0100

    [FLINK-39278][table] Indexing of columns in `CREATE [MATERIALIZED ]TABLE` 
should start with 1 in error messages
---
 .../MaterializedTableChangeHandler.java            |  4 +-
 .../table/planner/connectors/DynamicSinkUtils.java |  2 +-
 .../operations/converters/SchemaBuilderUtil.java   |  2 +-
 .../operations/SqlCTASNodeToOperationTest.java     |  2 +-
 ...erializedTableNodeToOperationConverterTest.java | 64 ++++++++++++----------
 .../SqlRTASNodeToOperationConverterTest.java       |  2 +-
 .../table/planner/catalog/CatalogTableITCase.scala |  2 +-
 7 files changed, 43 insertions(+), 35 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
index 7acde833570..8602812b6c3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
@@ -345,7 +345,7 @@ public class MaterializedTableChangeHandler {
                             "When modifying the query of a materialized table, 
"
                                     + "currently only support appending 
columns at the end of original schema, dropping, renaming, and reordering 
columns are not supported.\n"
                                     + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
-                            oldPosition, column, column));
+                            oldPosition + 1, column, column));
         }
 
         ColumnPosition position = columnWithChangedPosition.getNewPosition();
@@ -525,7 +525,7 @@ public class MaterializedTableChangeHandler {
                         "When modifying the query of a materialized table, "
                                 + "currently only support appending columns at 
the end of original schema, dropping, renaming, and reordering columns are not 
supported.\n"
                                 + "Column mismatch at position %d: Original 
column is [%s], but new column is [%s].",
-                        position, oldColumn, newColumn));
+                        position + 1, oldColumn, newColumn));
     }
 
     private void setColumnAtPosition(UnresolvedColumn column, ColumnPosition 
position) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index b2cc0b8e825..219b995864f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -420,7 +420,7 @@ public final class DynamicSinkUtils {
                 throw createSchemaMismatchException(
                         String.format(
                                 "Incompatible types for sink column '%s' at 
position %s.",
-                                sinkFields.get(i).getName(), i),
+                                sinkFields.get(i).getName(), i + 1),
                         tableDebugName,
                         queryFields,
                         sinkFields);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SchemaBuilderUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SchemaBuilderUtil.java
index cfb5fb8abdb..356114b614c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SchemaBuilderUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SchemaBuilderUtil.java
@@ -313,7 +313,7 @@ public class SchemaBuilderUtil {
                             "Incompatible types for sink column '%s' at 
position %d. "
                                     + "The source column has type '%s', "
                                     + "while the target column has type '%s'.",
-                            columnName, columnPos, sourceColumnType, 
sinkColumnType));
+                            columnName, columnPos + 1, sourceColumnType, 
sinkColumnType));
         }
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
index 3add4e37f46..d610569bf95 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
@@ -342,7 +342,7 @@ class SqlCTASNodeToOperationTest extends 
SqlNodeToOperationConversionTestBase {
         assertThatThrownBy(() -> parseAndConvert(sql))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Incompatible types for sink column 'f0' at position 
0. "
+                        "Incompatible types for sink column 'f0' at position 
1. "
                                 + "The source column has type 'INT NOT NULL', 
while the target "
                                 + "column has type 'BOOLEAN'.");
     }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index bcb1705eb03..00246e79719 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -691,30 +691,38 @@ class SqlMaterializedTableNodeToOperationConverterTest
         assertThat(materializedTable.getOrigin()).isEqualTo(expected);
     }
 
-    @Test
-    void testCreateOrAlterMaterializedTableWithChangingRefreshMode() {
-        // Changing refresh mode is not supported
-        final String sql =
-                "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n"
-                        + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
-                        + ")\n"
-                        + "COMMENT 'materialized table comment'\n"
-                        + "PARTITIONED BY (a, d)\n"
-                        + "WITH (\n"
-                        + "  'connector' = 'filesystem', \n"
-                        + "  'format' = 'json'\n"
-                        + ")\n"
-                        + "FRESHNESS = INTERVAL '30' SECOND\n"
-                        + "REFRESH_MODE = CONTINUOUS\n"
-                        + "AS SELECT * FROM t1";
-        Operation operation = parse(sql);
-        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
-
-        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
-        // Will be invoked while operation#execute
-        assertThatThrownBy(op::getTableChanges)
-                .isInstanceOf(ValidationException.class)
-                .hasMessage("Changing of REFRESH MODE is unsupported");
+    private static Collection<TestSpec> 
createOrAlterForExistingMaterializedTableFailedCaseSpecs() {
+        return List.of(
+                TestSpec.of(
+                        "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n"
+                                + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT 
ENFORCED"
+                                + ")\n"
+                                + "COMMENT 'materialized table comment'\n"
+                                + "PARTITIONED BY (a, d)\n"
+                                + "WITH (\n"
+                                + "  'connector' = 'filesystem', \n"
+                                + "  'format' = 'json'\n"
+                                + ")\n"
+                                + "FRESHNESS = INTERVAL '30' SECOND\n"
+                                + "REFRESH_MODE = CONTINUOUS\n"
+                                + "AS SELECT * FROM t1",
+                        "Changing of REFRESH MODE is unsupported"),
+                TestSpec.of(
+                        "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n"
+                                + "   a BIGINT, b INT, c INT, d INT, "
+                                + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT 
ENFORCED"
+                                + ")\n"
+                                + "COMMENT 'materialized table comment'\n"
+                                + "PARTITIONED BY (a, d)\n"
+                                + "WITH (\n"
+                                + "  'connector' = 'filesystem', \n"
+                                + "  'format' = 'json'\n"
+                                + ")\n"
+                                + "FRESHNESS = INTERVAL '30' SECOND\n"
+                                + "REFRESH_MODE = FULL\n"
+                                + "AS SELECT * FROM t1",
+                        "Incompatible types for sink column 'b' at position 2. 
"
+                                + "The source column has type 'STRING', while 
the target column has type 'INT'."));
     }
 
     @Test
@@ -817,21 +825,21 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         "When modifying the query of a materialized table, 
currently only support "
                                 + "appending columns at the end of original 
schema, dropping, "
                                 + "renaming, and reordering columns are not 
supported.\n"
-                                + "Column mismatch at position 2: Original 
column is [`c` INT], "
+                                + "Column mismatch at position 3: Original 
column is [`c` INT], "
                                 + "but new column is [`d` STRING]."),
                 TestSpec.of(
                         "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, 
CAST(d AS INT) AS d FROM t3",
                         "When modifying the query of a materialized table, 
currently only support "
                                 + "appending columns at the end of original 
schema, dropping, "
                                 + "renaming, and reordering columns are not 
supported.\n"
-                                + "Column mismatch at position 3: Original 
column is [`d` STRING], "
+                                + "Column mismatch at position 4: Original 
column is [`d` STRING], "
                                 + "but new column is [`d` INT]."),
                 TestSpec.of(
                         "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, 
CAST('d' AS STRING) AS d FROM t3",
                         "When modifying the query of a materialized table, 
currently only support "
                                 + "appending columns at the end of original 
schema, dropping, "
                                 + "renaming, and reordering columns are not 
supported.\n"
-                                + "Column mismatch at position 3: Original 
column is [`d` STRING], "
+                                + "Column mismatch at position 4: Original 
column is [`d` STRING], "
                                 + "but new column is [`d` STRING NOT NULL]."),
                 TestSpec.of(
                         "ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted 
AS SELECT '123'",
@@ -964,7 +972,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         "CREATE MATERIALIZED TABLE users_shops (shop_id 
STRING, user_id STRING)"
                                 + " FRESHNESS = INTERVAL '30' SECOND"
                                 + " AS SELECT 1 AS shop_id, 2 AS user_id",
-                        "Incompatible types for sink column 'shop_id' at 
position 0. The source column has type 'INT NOT NULL', "
+                        "Incompatible types for sink column 'shop_id' at 
position 1. The source column has type 'INT NOT NULL', "
                                 + "while the target column has type 
'STRING'."),
                 TestSpec.of(
                         "CREATE MATERIALIZED TABLE users_shops (shop_id INT, 
WATERMARK FOR ts AS `ts` - INTERVAL '5' SECOND)"
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
index dba843b5eb7..524bdcce224 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
@@ -215,7 +215,7 @@ class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConversionTe
         assertThatThrownBy(() -> parseAndConvert(sql))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Incompatible types for sink column 'a' at position 0. 
"
+                        "Incompatible types for sink column 'a' at position 1. 
"
                                 + "The source column has type 'BIGINT NOT 
NULL', while the target "
                                 + "column has type 'BOOLEAN'.");
     }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index e0b54370e53..f8546d4a678 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -595,7 +595,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends 
TableITCaseBase {
 
     assertThatExceptionOfType(classOf[ValidationException])
       .isThrownBy(() => tableEnv.executeSql(query).await())
-      .withMessageContaining("Incompatible types for sink column 'c' at 
position 1.")
+      .withMessageContaining("Incompatible types for sink column 'c' at 
position 2.")
   }
 
   @TestTemplate

Reply via email to