Maxime Radigue created FLINK-32600:
--------------------------------------
Summary: SQL Server JDBC Connector wrong quoteIdentifier + issue
when there is no update statement in upsert statement
Key: FLINK-32600
URL: https://issues.apache.org/jira/browse/FLINK-32600
Project: Flink
Issue Type: Bug
Components: Connectors / JDBC
Affects Versions: jdbc-3.1.1
Reporter: Maxime Radigue
Hi, i've discovered 2 Issues in the MS SQL JDBC connector. They are located in
the file SqlServerDialect.java
* the MS SQL Server quote identifier is [identifier].
* Assuming you want to upsert in this sql table: MyTable(id1,id2) with primary
key is the coumpound (id1,id2). There is an syntax error because there are no
fields to update
These are the fix I've tested against the 2 functions involved:
*@Override*
*public String quoteIdentifier(String identifier) {*
return "[" + identifier + "]";
*}*
*@Override*
*public Optional<String> getUpsertStatement(*
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
List<String> nonUniqueKeyFields =
Arrays.stream(fieldNames)
.filter(f ->
!Arrays.asList(uniqueKeyFields).contains(f))
.collect(Collectors.toList());
String fieldsProjection =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String valuesBinding =
Arrays.stream(fieldNames)
.map(f -> ":" + f + " " + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String usingClause = String.format("SELECT %s", valuesBinding);
String onConditions =
Arrays.stream(uniqueKeyFields)
.map(
f ->
"[TARGET]."
+ quoteIdentifier(f)
+ "=[SOURCE]."
+ quoteIdentifier(f))
.collect(Collectors.joining(" AND "));
String updateSetClause =
nonUniqueKeyFields.stream()
.map(
f ->
"[TARGET]."
+ quoteIdentifier(f)
+ "=[SOURCE]."
+ quoteIdentifier(f))
.collect(Collectors.joining(", "));
String insertValues =
Arrays.stream(fieldNames)
.map(f -> "[SOURCE]." + quoteIdentifier(f))
.collect(Collectors.joining(", "));
StringBuilder sb = new StringBuilder();
sb.append(
String.format(
"MERGE INTO %s AS [TARGET] USING (%s) AS [SOURCE] ON
(%s)",
quoteIdentifier(tableName), usingClause, onConditions));
if (StringUtils.isNotEmpty(updateSetClause)) {
sb.append(String.format(" WHEN MATCHED THEN UPDATE SET %s",
updateSetClause));
}
sb.append(
String.format(
" WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);",
fieldsProjection, insertValues));
return Optional.of(sb.toString());
*}*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)