This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2260c11eb6e [FLINK-38802][table] DDL with duplicated keys in table 
options should not fail
2260c11eb6e is described below

commit 2260c11eb6e34656eceb6905491f2d905b8ab7a6
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Dec 16 14:03:32 2025 +0100

    [FLINK-38802][table] DDL with duplicated keys in table options should not 
fail
---
 .../org/apache/flink/sql/parser/SqlParseUtils.java     | 16 +++++++++++++---
 .../sql/parser/ddl/catalog/SqlAlterCatalogOptions.java | 11 ++---------
 .../operations/SqlDdlToOperationConverterTest.java     | 18 ++++++++++++------
 3 files changed, 27 insertions(+), 18 deletions(-)

diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
index 2a289d09bd8..e115a0bc6c9 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
@@ -24,9 +24,12 @@ import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.util.NlsString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -36,6 +39,7 @@ import java.util.stream.Collectors;
 
 /** Utils methods for parsing DDLs. */
 public class SqlParseUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SqlParseUtils.class);
 
     private SqlParseUtils() {}
 
@@ -85,9 +89,15 @@ public class SqlParseUtils {
         if (propList == null) {
             return Map.of();
         }
-        return propList.getList().stream()
-                .map(p -> (SqlTableOption) p)
-                .collect(Collectors.toMap(k -> k.getKeyString(), 
SqlTableOption::getValueString));
+        final Map<String, String> result = new HashMap<>();
+        for (SqlNode node : propList) {
+            final SqlTableOption tableOption = (SqlTableOption) node;
+            final String key = tableOption.getKeyString();
+            if (result.put(key, tableOption.getValueString()) != null) {
+                LOG.warn("There are duplicated values for the same key {}", 
key);
+            }
+        }
+        return result;
     }
 
     public static List<String> extractList(
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/catalog/SqlAlterCatalogOptions.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/catalog/SqlAlterCatalogOptions.java
index ffcd2871811..ce7d14ae374 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/catalog/SqlAlterCatalogOptions.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/catalog/SqlAlterCatalogOptions.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.sql.parser.ddl.catalog;
 
+import org.apache.flink.sql.parser.SqlParseUtils;
 import org.apache.flink.sql.parser.SqlUnparseUtils;
-import org.apache.flink.sql.parser.ddl.SqlTableOption;
 
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
@@ -30,7 +30,6 @@ import org.apache.calcite.util.ImmutableNullableList;
 
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
 
@@ -55,13 +54,7 @@ public class SqlAlterCatalogOptions extends SqlAlterCatalog {
     }
 
     public Map<String, String> getProperties() {
-        return propertyList.stream()
-                .map(p -> (SqlTableOption) p)
-                .collect(
-                        Collectors.toMap(
-                                SqlTableOption::getKeyString,
-                                SqlTableOption::getValueString,
-                                (option1, option2) -> option2));
+        return SqlParseUtils.extractMap(propertyList);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 74c7354908a..abaeac9072b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -1117,19 +1117,24 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
         // test alter table options
         checkAlterNonExistTable("alter table %s nonexistent set ('k1' = 'v1', 
'K2' = 'V2')");
         Operation operation =
-                parse("alter table if exists cat1.db1.tb1 set ('k1' = 'v1', 
'K2' = 'V2')");
+                parse(
+                        "alter table if exists cat1.db1.tb1 set ('k1' = 'v1', 
'k2' = 'v2', 'K2' = 'V1', 'K2' = 'V2')");
         Map<String, String> expectedOptions = new HashMap<>();
         expectedOptions.put("connector", "dummy");
         expectedOptions.put("k", "v");
         expectedOptions.put("k1", "v1");
+        expectedOptions.put("k2", "v2");
         expectedOptions.put("K2", "V2");
 
         assertAlterTableOptions(
                 operation,
                 expectedIdentifier,
                 expectedOptions,
-                Arrays.asList(TableChange.set("k1", "v1"), 
TableChange.set("K2", "V2")),
-                "ALTER TABLE IF EXISTS cat1.db1.tb1\n  SET 'k1' = 'v1',\n  SET 
'K2' = 'V2'");
+                List.of(
+                        TableChange.set("k1", "v1"),
+                        TableChange.set("k2", "v2"),
+                        TableChange.set("K2", "V2")),
+                "ALTER TABLE IF EXISTS cat1.db1.tb1\n  SET 'k1' = 'v1',\n  SET 
'k2' = 'v2',\n  SET 'K2' = 'V2'");
 
         // test alter table reset
         checkAlterNonExistTable("alter table %s nonexistent reset ('k')");
@@ -1137,8 +1142,8 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
         assertAlterTableOptions(
                 operation,
                 expectedIdentifier,
-                Collections.singletonMap("connector", "dummy"),
-                Collections.singletonList(TableChange.reset("k")),
+                Map.of("connector", "dummy"),
+                List.of(TableChange.reset("k")),
                 "ALTER TABLE IF EXISTS cat1.db1.tb1\n  RESET 'k'");
         assertThatThrownBy(() -> parse("alter table cat1.db1.tb1 reset 
('connector')"))
                 .isInstanceOf(ValidationException.class)
@@ -2885,7 +2890,8 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
         
assertThat(alterTableOptionsOperation.getTableIdentifier()).isEqualTo(expectedIdentifier);
         assertThat(alterTableOptionsOperation.getNewTable().getOptions())
                 .isEqualTo(expectedOptions);
-        
assertThat(expectedChanges).isEqualTo(alterTableOptionsOperation.getTableChanges());
+        assertThat(expectedChanges)
+                
.containsExactlyInAnyOrderElementsOf(alterTableOptionsOperation.getTableChanges());
         
assertThat(alterTableOptionsOperation.asSummaryString()).isEqualTo(expectedSummary);
     }
 

Reply via email to