This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2260c11eb6e [FLINK-38802][table] DDL with duplicated keys in table
options should not fail
2260c11eb6e is described below
commit 2260c11eb6e34656eceb6905491f2d905b8ab7a6
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Dec 16 14:03:32 2025 +0100
[FLINK-38802][table] DDL with duplicated keys in table options should not
fail
---
.../org/apache/flink/sql/parser/SqlParseUtils.java | 16 +++++++++++++---
.../sql/parser/ddl/catalog/SqlAlterCatalogOptions.java | 11 ++---------
.../operations/SqlDdlToOperationConverterTest.java | 18 ++++++++++++------
3 files changed, 27 insertions(+), 18 deletions(-)
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
index 2a289d09bd8..e115a0bc6c9 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
@@ -24,9 +24,12 @@ import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.util.NlsString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -36,6 +39,7 @@ import java.util.stream.Collectors;
/** Utils methods for parsing DDLs. */
public class SqlParseUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(SqlParseUtils.class);
private SqlParseUtils() {}
@@ -85,9 +89,15 @@ public class SqlParseUtils {
if (propList == null) {
return Map.of();
}
- return propList.getList().stream()
- .map(p -> (SqlTableOption) p)
- .collect(Collectors.toMap(k -> k.getKeyString(),
SqlTableOption::getValueString));
+ final Map<String, String> result = new HashMap<>();
+ for (SqlNode node : propList) {
+ final SqlTableOption tableOption = (SqlTableOption) node;
+ final String key = tableOption.getKeyString();
+ if (result.put(key, tableOption.getValueString()) != null) {
+ LOG.warn("There are duplicated values for the same key {}",
key);
+ }
+ }
+ return result;
}
public static List<String> extractList(
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/catalog/SqlAlterCatalogOptions.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/catalog/SqlAlterCatalogOptions.java
index ffcd2871811..ce7d14ae374 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/catalog/SqlAlterCatalogOptions.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/catalog/SqlAlterCatalogOptions.java
@@ -18,8 +18,8 @@
package org.apache.flink.sql.parser.ddl.catalog;
+import org.apache.flink.sql.parser.SqlParseUtils;
import org.apache.flink.sql.parser.SqlUnparseUtils;
-import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
@@ -30,7 +30,6 @@ import org.apache.calcite.util.ImmutableNullableList;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
@@ -55,13 +54,7 @@ public class SqlAlterCatalogOptions extends SqlAlterCatalog {
}
public Map<String, String> getProperties() {
- return propertyList.stream()
- .map(p -> (SqlTableOption) p)
- .collect(
- Collectors.toMap(
- SqlTableOption::getKeyString,
- SqlTableOption::getValueString,
- (option1, option2) -> option2));
+ return SqlParseUtils.extractMap(propertyList);
}
@Override
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 74c7354908a..abaeac9072b 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -1117,19 +1117,24 @@ class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBas
// test alter table options
checkAlterNonExistTable("alter table %s nonexistent set ('k1' = 'v1',
'K2' = 'V2')");
Operation operation =
- parse("alter table if exists cat1.db1.tb1 set ('k1' = 'v1',
'K2' = 'V2')");
+ parse(
+ "alter table if exists cat1.db1.tb1 set ('k1' = 'v1',
'k2' = 'v2', 'K2' = 'V1', 'K2' = 'V2')");
Map<String, String> expectedOptions = new HashMap<>();
expectedOptions.put("connector", "dummy");
expectedOptions.put("k", "v");
expectedOptions.put("k1", "v1");
+ expectedOptions.put("k2", "v2");
expectedOptions.put("K2", "V2");
assertAlterTableOptions(
operation,
expectedIdentifier,
expectedOptions,
- Arrays.asList(TableChange.set("k1", "v1"),
TableChange.set("K2", "V2")),
- "ALTER TABLE IF EXISTS cat1.db1.tb1\n SET 'k1' = 'v1',\n SET
'K2' = 'V2'");
+ List.of(
+ TableChange.set("k1", "v1"),
+ TableChange.set("k2", "v2"),
+ TableChange.set("K2", "V2")),
+ "ALTER TABLE IF EXISTS cat1.db1.tb1\n SET 'k1' = 'v1',\n SET
'k2' = 'v2',\n SET 'K2' = 'V2'");
// test alter table reset
checkAlterNonExistTable("alter table %s nonexistent reset ('k')");
@@ -1137,8 +1142,8 @@ class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBas
assertAlterTableOptions(
operation,
expectedIdentifier,
- Collections.singletonMap("connector", "dummy"),
- Collections.singletonList(TableChange.reset("k")),
+ Map.of("connector", "dummy"),
+ List.of(TableChange.reset("k")),
"ALTER TABLE IF EXISTS cat1.db1.tb1\n RESET 'k'");
assertThatThrownBy(() -> parse("alter table cat1.db1.tb1 reset
('connector')"))
.isInstanceOf(ValidationException.class)
@@ -2885,7 +2890,8 @@ class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBas
assertThat(alterTableOptionsOperation.getTableIdentifier()).isEqualTo(expectedIdentifier);
assertThat(alterTableOptionsOperation.getNewTable().getOptions())
.isEqualTo(expectedOptions);
-
assertThat(expectedChanges).isEqualTo(alterTableOptionsOperation.getTableChanges());
+ assertThat(expectedChanges)
+
.containsExactlyInAnyOrderElementsOf(alterTableOptionsOperation.getTableChanges());
assertThat(alterTableOptionsOperation.asSummaryString()).isEqualTo(expectedSummary);
}