This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.0 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit d604559d114e0b0220dcf6ed8f896c6611d935e1 Author: liming.1018 <[email protected]> AuthorDate: Tue Jan 7 09:50:45 2025 +0800 [flink][action] add '`' to the fields of merge into action to avoid exceptions when the field name is an SQL keyword. (#4846) --- .../paimon/flink/action/MergeIntoAction.java | 36 +++++++++++++++++++--- .../paimon/flink/action/MergeIntoActionITCase.java | 23 ++++++++++++++ 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java index 1ecd23ea62..7254999797 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java @@ -23,6 +23,7 @@ import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; +import org.apache.paimon.utils.StringUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.Table; @@ -77,6 +78,8 @@ public class MergeIntoAction extends TableActionBase { private static final Logger LOG = LoggerFactory.getLogger(MergeIntoAction.class); + public static final String IDENTIFIER_QUOTE = "`"; + // primary keys of target table private final List<String> primaryKeys; @@ -333,7 +336,7 @@ public class MergeIntoAction extends TableActionBase { String query = String.format( "SELECT %s FROM %s INNER JOIN %s ON %s %s", - String.join(",", project), + String.join(",", normalizeFieldName(project)), escapedTargetName(), escapedSourceName(), mergeCondition, @@ -377,7 +380,7 @@ public class MergeIntoAction extends TableActionBase { String query = String.format( "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", - String.join(",", project), + String.join(",", normalizeFieldName(project)), escapedTargetName(), escapedSourceName(), mergeCondition, @@ -408,7 +411,7 @@ public class MergeIntoAction extends TableActionBase { String query = String.format( "SELECT %s FROM %s INNER JOIN %s ON %s %s", - String.join(",", project), + String.join(",", normalizeFieldName(project)), escapedTargetName(), escapedSourceName(), mergeCondition, @@ -430,7 +433,7 @@ public class MergeIntoAction extends TableActionBase { String query = String.format( "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", - String.join(",", targetFieldNames), + String.join(",", normalizeFieldName(targetFieldNames)), escapedTargetName(), escapedSourceName(), mergeCondition, @@ -519,4 +522,29 @@ public class MergeIntoAction extends TableActionBase { .map(s -> String.format("`%s`", s)) .collect(Collectors.joining(".")); } + + private List<String> normalizeFieldName(List<String> fieldNames) { + return fieldNames.stream().map(this::normalizeFieldName).collect(Collectors.toList()); + } + + private String normalizeFieldName(String fieldName) { + if (StringUtils.isNullOrWhitespaceOnly(fieldName) || fieldName.endsWith(IDENTIFIER_QUOTE)) { + return fieldName; + } + + String[] splitFieldNames = fieldName.split("\\."); + if (!targetFieldNames.contains(splitFieldNames[splitFieldNames.length - 1])) { + return fieldName; + } + + return String.join( + ".", + Arrays.stream(splitFieldNames) + .map( + part -> + part.endsWith(IDENTIFIER_QUOTE) + ? part + : IDENTIFIER_QUOTE + part + IDENTIFIER_QUOTE) + .toArray(String[]::new)); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java index 3907c03985..25b4055465 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java @@ -688,6 +688,29 @@ public class MergeIntoActionITCase extends ActionITCaseBase { } } + @Test + public void testSqlWithKeywordCase() throws Exception { + // drop table S + sEnv.executeSql("DROP TABLE T"); + sEnv.executeSql( + buildDdl( + "T", + Arrays.asList("k INT", "`language` STRING", "dt STRING"), + Arrays.asList("k", "dt"), + Collections.singletonList("dt"), + Collections.emptyMap())); + insertInto("T", "(1, 'v_1', '02-27')", "(13, 'v_13', '02-29')"); + + MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T"); + action.withSourceTable("S") + .withMergeCondition("T.k = S.k AND T.dt = S.dt") + .withMatchedDelete("S.k < 12"); + + List<Row> batchExpected = Arrays.asList(changelogRow("+I", 13, "v_13", "02-29")); + action.build().run(); + testBatchRead(buildSimpleQuery("T"), batchExpected); + } + private void validateActionRunResult( MergeIntoAction action, List<Row> streamingExpected, List<Row> batchExpected) throws Exception {
