This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9b0aaf30d30 [FLINK-39278][table] Indexing of columns in `CREATE
[MATERIALIZED ]TABLE` should start with 1 in error messages
9b0aaf30d30 is described below
commit 9b0aaf30d308ae58920aade8f6a789f6e134bc02
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Mar 20 21:01:36 2026 +0100
[FLINK-39278][table] Indexing of columns in `CREATE [MATERIALIZED ]TABLE`
should start with 1 in error messages
---
.../MaterializedTableChangeHandler.java | 4 +-
.../table/planner/connectors/DynamicSinkUtils.java | 2 +-
.../operations/converters/SchemaBuilderUtil.java | 2 +-
.../operations/SqlCTASNodeToOperationTest.java | 2 +-
...erializedTableNodeToOperationConverterTest.java | 64 ++++++++++++----------
.../SqlRTASNodeToOperationConverterTest.java | 2 +-
.../table/planner/catalog/CatalogTableITCase.scala | 2 +-
7 files changed, 43 insertions(+), 35 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
index 7acde833570..8602812b6c3 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
@@ -345,7 +345,7 @@ public class MaterializedTableChangeHandler {
"When modifying the query of a materialized table,
"
+ "currently only support appending
columns at the end of original schema, dropping, renaming, and reordering
columns are not supported.\n"
+ "Column mismatch at position %d:
Original column is [%s], but new column is [%s].",
- oldPosition, column, column));
+ oldPosition + 1, column, column));
}
ColumnPosition position = columnWithChangedPosition.getNewPosition();
@@ -525,7 +525,7 @@ public class MaterializedTableChangeHandler {
"When modifying the query of a materialized table, "
+ "currently only support appending columns at
the end of original schema, dropping, renaming, and reordering columns are not
supported.\n"
+ "Column mismatch at position %d: Original
column is [%s], but new column is [%s].",
- position, oldColumn, newColumn));
+ position + 1, oldColumn, newColumn));
}
private void setColumnAtPosition(UnresolvedColumn column, ColumnPosition
position) {
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index b2cc0b8e825..219b995864f 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -420,7 +420,7 @@ public final class DynamicSinkUtils {
throw createSchemaMismatchException(
String.format(
"Incompatible types for sink column '%s' at
position %s.",
- sinkFields.get(i).getName(), i),
+ sinkFields.get(i).getName(), i + 1),
tableDebugName,
queryFields,
sinkFields);
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SchemaBuilderUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SchemaBuilderUtil.java
index cfb5fb8abdb..356114b614c 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SchemaBuilderUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SchemaBuilderUtil.java
@@ -313,7 +313,7 @@ public class SchemaBuilderUtil {
"Incompatible types for sink column '%s' at
position %d. "
+ "The source column has type '%s', "
+ "while the target column has type '%s'.",
- columnName, columnPos, sourceColumnType,
sinkColumnType));
+ columnName, columnPos + 1, sourceColumnType,
sinkColumnType));
}
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
index 3add4e37f46..d610569bf95 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
@@ -342,7 +342,7 @@ class SqlCTASNodeToOperationTest extends
SqlNodeToOperationConversionTestBase {
assertThatThrownBy(() -> parseAndConvert(sql))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Incompatible types for sink column 'f0' at position
0. "
+ "Incompatible types for sink column 'f0' at position
1. "
+ "The source column has type 'INT NOT NULL',
while the target "
+ "column has type 'BOOLEAN'.");
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index bcb1705eb03..00246e79719 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -691,30 +691,38 @@ class SqlMaterializedTableNodeToOperationConverterTest
assertThat(materializedTable.getOrigin()).isEqualTo(expected);
}
- @Test
- void testCreateOrAlterMaterializedTableWithChangingRefreshMode() {
- // Changing refresh mode is not supported
- final String sql =
- "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n"
- + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
- + ")\n"
- + "COMMENT 'materialized table comment'\n"
- + "PARTITIONED BY (a, d)\n"
- + "WITH (\n"
- + " 'connector' = 'filesystem', \n"
- + " 'format' = 'json'\n"
- + ")\n"
- + "FRESHNESS = INTERVAL '30' SECOND\n"
- + "REFRESH_MODE = CONTINUOUS\n"
- + "AS SELECT * FROM t1";
- Operation operation = parse(sql);
-
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
-
- FullAlterMaterializedTableOperation op =
(FullAlterMaterializedTableOperation) operation;
- // Will be invoked while operation#execute
- assertThatThrownBy(op::getTableChanges)
- .isInstanceOf(ValidationException.class)
- .hasMessage("Changing of REFRESH MODE is unsupported");
+ private static Collection<TestSpec>
createOrAlterForExistingMaterializedTableFailedCaseSpecs() {
+ return List.of(
+ TestSpec.of(
+ "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n"
+ + " CONSTRAINT ct1 PRIMARY KEY(a) NOT
ENFORCED"
+ + ")\n"
+ + "COMMENT 'materialized table comment'\n"
+ + "PARTITIONED BY (a, d)\n"
+ + "WITH (\n"
+ + " 'connector' = 'filesystem', \n"
+ + " 'format' = 'json'\n"
+ + ")\n"
+ + "FRESHNESS = INTERVAL '30' SECOND\n"
+ + "REFRESH_MODE = CONTINUOUS\n"
+ + "AS SELECT * FROM t1",
+ "Changing of REFRESH MODE is unsupported"),
+ TestSpec.of(
+ "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n"
+ + " a BIGINT, b INT, c INT, d INT, "
+ + " CONSTRAINT ct1 PRIMARY KEY(a) NOT
ENFORCED"
+ + ")\n"
+ + "COMMENT 'materialized table comment'\n"
+ + "PARTITIONED BY (a, d)\n"
+ + "WITH (\n"
+ + " 'connector' = 'filesystem', \n"
+ + " 'format' = 'json'\n"
+ + ")\n"
+ + "FRESHNESS = INTERVAL '30' SECOND\n"
+ + "REFRESH_MODE = FULL\n"
+ + "AS SELECT * FROM t1",
+ "Incompatible types for sink column 'b' at position 2.
"
+ + "The source column has type 'STRING', while
the target column has type 'INT'."));
}
@Test
@@ -817,21 +825,21 @@ class SqlMaterializedTableNodeToOperationConverterTest
"When modifying the query of a materialized table,
currently only support "
+ "appending columns at the end of original
schema, dropping, "
+ "renaming, and reordering columns are not
supported.\n"
- + "Column mismatch at position 2: Original
column is [`c` INT], "
+ + "Column mismatch at position 3: Original
column is [`c` INT], "
+ "but new column is [`d` STRING]."),
TestSpec.of(
"ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c,
CAST(d AS INT) AS d FROM t3",
"When modifying the query of a materialized table,
currently only support "
+ "appending columns at the end of original
schema, dropping, "
+ "renaming, and reordering columns are not
supported.\n"
- + "Column mismatch at position 3: Original
column is [`d` STRING], "
+ + "Column mismatch at position 4: Original
column is [`d` STRING], "
+ "but new column is [`d` INT]."),
TestSpec.of(
"ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c,
CAST('d' AS STRING) AS d FROM t3",
"When modifying the query of a materialized table,
currently only support "
+ "appending columns at the end of original
schema, dropping, "
+ "renaming, and reordering columns are not
supported.\n"
- + "Column mismatch at position 3: Original
column is [`d` STRING], "
+ + "Column mismatch at position 4: Original
column is [`d` STRING], "
+ "but new column is [`d` STRING NOT NULL]."),
TestSpec.of(
"ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted
AS SELECT '123'",
@@ -964,7 +972,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
"CREATE MATERIALIZED TABLE users_shops (shop_id
STRING, user_id STRING)"
+ " FRESHNESS = INTERVAL '30' SECOND"
+ " AS SELECT 1 AS shop_id, 2 AS user_id",
- "Incompatible types for sink column 'shop_id' at
position 0. The source column has type 'INT NOT NULL', "
+ "Incompatible types for sink column 'shop_id' at
position 1. The source column has type 'INT NOT NULL', "
+ "while the target column has type
'STRING'."),
TestSpec.of(
"CREATE MATERIALIZED TABLE users_shops (shop_id INT,
WATERMARK FOR ts AS `ts` - INTERVAL '5' SECOND)"
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
index dba843b5eb7..524bdcce224 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
@@ -215,7 +215,7 @@ class SqlRTASNodeToOperationConverterTest extends
SqlNodeToOperationConversionTe
assertThatThrownBy(() -> parseAndConvert(sql))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Incompatible types for sink column 'a' at position 0.
"
+ "Incompatible types for sink column 'a' at position 1.
"
+ "The source column has type 'BIGINT NOT
NULL', while the target "
+ "column has type 'BOOLEAN'.");
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index e0b54370e53..f8546d4a678 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -595,7 +595,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends
TableITCaseBase {
assertThatExceptionOfType(classOf[ValidationException])
.isThrownBy(() => tableEnv.executeSql(query).await())
- .withMessageContaining("Incompatible types for sink column 'c' at
position 1.")
+ .withMessageContaining("Incompatible types for sink column 'c' at
position 2.")
}
@TestTemplate