This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit d604559d114e0b0220dcf6ed8f896c6611d935e1
Author: liming.1018 <[email protected]>
AuthorDate: Tue Jan 7 09:50:45 2025 +0800

    [flink][action] add '`' to the fields of merge into action to avoid 
exceptions when the field name is an SQL keyword. (#4846)
---
 .../paimon/flink/action/MergeIntoAction.java       | 36 +++++++++++++++++++---
 .../paimon/flink/action/MergeIntoActionITCase.java | 23 ++++++++++++++
 2 files changed, 55 insertions(+), 4 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
index 1ecd23ea62..7254999797 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
@@ -23,6 +23,7 @@ import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.Table;
@@ -77,6 +78,8 @@ public class MergeIntoAction extends TableActionBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MergeIntoAction.class);
 
+    public static final String IDENTIFIER_QUOTE = "`";
+
     // primary keys of target table
     private final List<String> primaryKeys;
 
@@ -333,7 +336,7 @@ public class MergeIntoAction extends TableActionBase {
         String query =
                 String.format(
                         "SELECT %s FROM %s INNER JOIN %s ON %s %s",
-                        String.join(",", project),
+                        String.join(",", normalizeFieldName(project)),
                         escapedTargetName(),
                         escapedSourceName(),
                         mergeCondition,
@@ -377,7 +380,7 @@ public class MergeIntoAction extends TableActionBase {
         String query =
                 String.format(
                         "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s 
WHERE %s) %s",
-                        String.join(",", project),
+                        String.join(",", normalizeFieldName(project)),
                         escapedTargetName(),
                         escapedSourceName(),
                         mergeCondition,
@@ -408,7 +411,7 @@ public class MergeIntoAction extends TableActionBase {
         String query =
                 String.format(
                         "SELECT %s FROM %s INNER JOIN %s ON %s %s",
-                        String.join(",", project),
+                        String.join(",", normalizeFieldName(project)),
                         escapedTargetName(),
                         escapedSourceName(),
                         mergeCondition,
@@ -430,7 +433,7 @@ public class MergeIntoAction extends TableActionBase {
         String query =
                 String.format(
                         "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s 
WHERE %s) %s",
-                        String.join(",", targetFieldNames),
+                        String.join(",", normalizeFieldName(targetFieldNames)),
                         escapedTargetName(),
                         escapedSourceName(),
                         mergeCondition,
@@ -519,4 +522,29 @@ public class MergeIntoAction extends TableActionBase {
                 .map(s -> String.format("`%s`", s))
                 .collect(Collectors.joining("."));
     }
+
+    private List<String> normalizeFieldName(List<String> fieldNames) {
+        return 
fieldNames.stream().map(this::normalizeFieldName).collect(Collectors.toList());
+    }
+
+    private String normalizeFieldName(String fieldName) {
+        if (StringUtils.isNullOrWhitespaceOnly(fieldName) || 
fieldName.endsWith(IDENTIFIER_QUOTE)) {
+            return fieldName;
+        }
+
+        String[] splitFieldNames = fieldName.split("\\.");
+        if (!targetFieldNames.contains(splitFieldNames[splitFieldNames.length 
- 1])) {
+            return fieldName;
+        }
+
+        return String.join(
+                ".",
+                Arrays.stream(splitFieldNames)
+                        .map(
+                                part ->
+                                        part.endsWith(IDENTIFIER_QUOTE)
+                                                ? part
+                                                : IDENTIFIER_QUOTE + part + 
IDENTIFIER_QUOTE)
+                        .toArray(String[]::new));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index 3907c03985..25b4055465 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -688,6 +688,29 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
         }
     }
 
+    @Test
+    public void testSqlWithKeywordCase() throws Exception {
+        // drop table S
+        sEnv.executeSql("DROP TABLE T");
+        sEnv.executeSql(
+                buildDdl(
+                        "T",
+                        Arrays.asList("k INT", "`language` STRING", "dt 
STRING"),
+                        Arrays.asList("k", "dt"),
+                        Collections.singletonList("dt"),
+                        Collections.emptyMap()));
+        insertInto("T", "(1, 'v_1', '02-27')", "(13, 'v_13', '02-29')");
+
+        MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, 
database, "T");
+        action.withSourceTable("S")
+                .withMergeCondition("T.k = S.k AND T.dt = S.dt")
+                .withMatchedDelete("S.k < 12");
+
+        List<Row> batchExpected = Arrays.asList(changelogRow("+I", 13, "v_13", 
"02-29"));
+        action.build().run();
+        testBatchRead(buildSimpleQuery("T"), batchExpected);
+    }
+
     private void validateActionRunResult(
             MergeIntoAction action, List<Row> streamingExpected, List<Row> 
batchExpected)
             throws Exception {

Reply via email to