This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d62f67b710 [flink][action] add '`' to the fields of merge into action
to avoid exceptions when the field name is an SQL keyword. (#4846)
d62f67b710 is described below
commit d62f67b710c2ec0dcfda8a2fe87ed38dd943e0ca
Author: liming.1018 <[email protected]>
AuthorDate: Tue Jan 7 09:50:45 2025 +0800
[flink][action] add '`' to the fields of merge into action to avoid
exceptions when the field name is an SQL keyword. (#4846)
---
.../paimon/flink/action/MergeIntoAction.java | 36 +++++++++++++++++++---
.../paimon/flink/action/MergeIntoActionITCase.java | 23 ++++++++++++++
2 files changed, 55 insertions(+), 4 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
index 1ecd23ea62..7254999797 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
@@ -23,6 +23,7 @@ import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.StringUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
@@ -77,6 +78,8 @@ public class MergeIntoAction extends TableActionBase {
private static final Logger LOG =
LoggerFactory.getLogger(MergeIntoAction.class);
+ public static final String IDENTIFIER_QUOTE = "`";
+
// primary keys of target table
private final List<String> primaryKeys;
@@ -333,7 +336,7 @@ public class MergeIntoAction extends TableActionBase {
String query =
String.format(
"SELECT %s FROM %s INNER JOIN %s ON %s %s",
- String.join(",", project),
+ String.join(",", normalizeFieldName(project)),
escapedTargetName(),
escapedSourceName(),
mergeCondition,
@@ -377,7 +380,7 @@ public class MergeIntoAction extends TableActionBase {
String query =
String.format(
"SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s
WHERE %s) %s",
- String.join(",", project),
+ String.join(",", normalizeFieldName(project)),
escapedTargetName(),
escapedSourceName(),
mergeCondition,
@@ -408,7 +411,7 @@ public class MergeIntoAction extends TableActionBase {
String query =
String.format(
"SELECT %s FROM %s INNER JOIN %s ON %s %s",
- String.join(",", project),
+ String.join(",", normalizeFieldName(project)),
escapedTargetName(),
escapedSourceName(),
mergeCondition,
@@ -430,7 +433,7 @@ public class MergeIntoAction extends TableActionBase {
String query =
String.format(
"SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s
WHERE %s) %s",
- String.join(",", targetFieldNames),
+ String.join(",", normalizeFieldName(targetFieldNames)),
escapedTargetName(),
escapedSourceName(),
mergeCondition,
@@ -519,4 +522,29 @@ public class MergeIntoAction extends TableActionBase {
.map(s -> String.format("`%s`", s))
.collect(Collectors.joining("."));
}
+
+ private List<String> normalizeFieldName(List<String> fieldNames) {
+ return
fieldNames.stream().map(this::normalizeFieldName).collect(Collectors.toList());
+ }
+
+ private String normalizeFieldName(String fieldName) {
+ if (StringUtils.isNullOrWhitespaceOnly(fieldName) ||
fieldName.endsWith(IDENTIFIER_QUOTE)) {
+ return fieldName;
+ }
+
+ String[] splitFieldNames = fieldName.split("\\.");
+ if (!targetFieldNames.contains(splitFieldNames[splitFieldNames.length
- 1])) {
+ return fieldName;
+ }
+
+ return String.join(
+ ".",
+ Arrays.stream(splitFieldNames)
+ .map(
+ part ->
+ part.endsWith(IDENTIFIER_QUOTE)
+ ? part
+ : IDENTIFIER_QUOTE + part +
IDENTIFIER_QUOTE)
+ .toArray(String[]::new));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index 3907c03985..25b4055465 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -688,6 +688,29 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
}
}
+ @Test
+ public void testSqlWithKeywordCase() throws Exception {
+ // drop table S
+ sEnv.executeSql("DROP TABLE T");
+ sEnv.executeSql(
+ buildDdl(
+ "T",
+ Arrays.asList("k INT", "`language` STRING", "dt
STRING"),
+ Arrays.asList("k", "dt"),
+ Collections.singletonList("dt"),
+ Collections.emptyMap()));
+ insertInto("T", "(1, 'v_1', '02-27')", "(13, 'v_13', '02-29')");
+
+ MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse,
database, "T");
+ action.withSourceTable("S")
+ .withMergeCondition("T.k = S.k AND T.dt = S.dt")
+ .withMatchedDelete("S.k < 12");
+
+ List<Row> batchExpected = Arrays.asList(changelogRow("+I", 13, "v_13",
"02-29"));
+ action.build().run();
+ testBatchRead(buildSimpleQuery("T"), batchExpected);
+ }
+
private void validateActionRunResult(
MergeIntoAction action, List<Row> streamingExpected, List<Row>
batchExpected)
throws Exception {