This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.15 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 2273fe57660cad05943e018abcbeb2fec8d4325c Author: Roberto Santos <rsi...@gmail.com> AuthorDate: Sat Sep 4 08:40:16 2021 -0300 NIFI-9194: Upsert for Oracle12+ Fixes pr #5366. Fixes pr #5366. Replace tabchars fot whitespaces. Fixes pr #5366. Replaced tabchars for whitespaces. Removed unnecessary comments. Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #5366 --- .../standard/db/impl/Oracle12DatabaseAdapter.java | 105 ++++++++++++++++++++- .../db/impl/TestOracle12DatabaseAdapter.java | 79 ++++++++++++++++ 2 files changed, 179 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java index 18f3ceb..63e7379 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java @@ -16,12 +16,14 @@ */ package org.apache.nifi.processors.standard.db.impl; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.processors.standard.db.DatabaseAdapter; -/** - * A database adapter that generates MS SQL Compatible SQL. - */ public class Oracle12DatabaseAdapter implements DatabaseAdapter { @Override public String getName() { @@ -34,12 +36,14 @@ public class Oracle12DatabaseAdapter implements DatabaseAdapter { } @Override - public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) { + public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, + Long limit, Long offset) { return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null); } @Override - public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) { + public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, + Long limit, Long offset, String columnForPartitioning) { if (StringUtils.isEmpty(tableName)) { throw new IllegalArgumentException("Table name cannot be null or empty"); } @@ -93,4 +97,95 @@ public class Oracle12DatabaseAdapter implements DatabaseAdapter { public String getTableAliasClause(String tableName) { return tableName; } + + @Override + public boolean supportsUpsert() { + return true; + } + + @Override + public String getUpsertStatement(String table, List<String> columnNames, Collection<String> uniqueKeyColumnNames) + throws IllegalArgumentException { + if (StringUtils.isEmpty(table)) { + throw new IllegalArgumentException("Table name cannot be null or blank"); + } + if (columnNames == null || columnNames.isEmpty()) { + throw new IllegalArgumentException("Column names cannot be null or empty"); + } + if (uniqueKeyColumnNames == null || uniqueKeyColumnNames.isEmpty()) { + throw new IllegalArgumentException("Key column names cannot be null or empty"); + } + + String newValuesAlias = "n"; + + String columns = columnNames.stream().collect(Collectors.joining(", ? ")); + + columns = "? " + columns; + + List<String> columnsAssignment = getColumnsAssignment(columnNames, newValuesAlias, table); + + List<String> conflictColumnsClause = getConflictColumnsClause(uniqueKeyColumnNames, columnsAssignment, table, + newValuesAlias); + String conflictClause = "(" + conflictColumnsClause.stream().collect(Collectors.joining(" AND ")) + ")"; + + String insertStatement = columnNames.stream().collect(Collectors.joining(", ")); + String insertValues = newValuesAlias + "." + + columnNames.stream().collect(Collectors.joining(", " + newValuesAlias + ".")); + + columnsAssignment.removeAll(conflictColumnsClause); + String updateStatement = columnsAssignment.stream().collect(Collectors.joining(", ")); + + StringBuilder statementStringBuilder = new StringBuilder("MERGE INTO ").append(table).append(" USING (SELECT ") + .append(columns).append(" FROM DUAL) ").append(newValuesAlias).append(" ON ").append(conflictClause) + .append(" WHEN NOT MATCHED THEN INSERT (").append(insertStatement).append(") VALUES (") + .append(insertValues).append(")").append(" WHEN MATCHED THEN UPDATE SET ").append(updateStatement); + + return statementStringBuilder.toString(); + } + + private List<String> getConflictColumnsClause(Collection<String> uniqueKeyColumnNames, List<String> conflictColumns, + String table, String newTableAlias) { + List<String> conflictColumnsClause = conflictColumns.stream() + .filter(column -> uniqueKeyColumnNames.stream().anyMatch( + uniqueKey -> column.equalsIgnoreCase(getColumnAssignment(table, uniqueKey, newTableAlias)))) + .collect(Collectors.toList()); + + if (conflictColumnsClause.isEmpty()) { + + // Try it with normalized columns + conflictColumnsClause = conflictColumns.stream() + .filter((column -> uniqueKeyColumnNames.stream() + .anyMatch(uniqueKey -> normalizeColumnName(column).equalsIgnoreCase( + normalizeColumnName(getColumnAssignment(table, uniqueKey, newTableAlias)))))) + .collect(Collectors.toList()); + } + + return conflictColumnsClause; + + } + + private String normalizeColumnName(final String colName) { + return colName == null ? null : colName.toUpperCase().replace("_", ""); + } + + private List<String> getColumnsAssignment(Collection<String> columnsNames, String newTableAlias, String table) { + List<String> conflictClause = new ArrayList<>(); + + for (String columnName : columnsNames) { + + StringBuilder statementStringBuilder = new StringBuilder(); + + statementStringBuilder.append(getColumnAssignment(table, columnName, newTableAlias)); + + conflictClause.add(statementStringBuilder.toString()); + + } + + return conflictClause; + } + + private String getColumnAssignment(String table, String columnName, String newTableAlias) { + return table + "." + columnName + " = " + newTableAlias + "." + columnName; + } + } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java index 2315e98..99d625a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java @@ -16,6 +16,15 @@ */ package org.apache.nifi.processors.standard.db.impl; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + import org.apache.nifi.processors.standard.db.DatabaseAdapter; import org.junit.Assert; import org.junit.Test; @@ -86,4 +95,74 @@ public class TestOracle12DatabaseAdapter { String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename"; Assert.assertEquals(expected4, sql4); } + + @Test + public void testSupportsUpsert() throws Exception { + assertTrue(db.getClass().getSimpleName() + " should support upsert", db.supportsUpsert()); + } + + @Test + public void testGetUpsertStatementWithNullTableName() throws Exception { + testGetUpsertStatement(null, Arrays.asList("notEmpty"), Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be null or blank")); + } + + @Test + public void testGetUpsertStatementWithBlankTableName() throws Exception { + testGetUpsertStatement("", Arrays.asList("notEmpty"), Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be null or blank")); + } + + @Test + public void testGetUpsertStatementWithNullColumnNames() throws Exception { + testGetUpsertStatement("notEmpty", null, Arrays.asList("notEmpty"), new IllegalArgumentException("Column names cannot be null or empty")); + } + + @Test + public void testGetUpsertStatementWithEmptyColumnNames() throws Exception { + testGetUpsertStatement("notEmpty", Collections.emptyList(), Arrays.asList("notEmpty"), new IllegalArgumentException("Column names cannot be null or empty")); + } + + @Test + public void testGetUpsertStatementWithNullKeyColumnNames() throws Exception { + testGetUpsertStatement("notEmpty", Arrays.asList("notEmpty"), null, new IllegalArgumentException("Key column names cannot be null or empty")); + } + + @Test + public void testGetUpsertStatementWithEmptyKeyColumnNames() throws Exception { + testGetUpsertStatement("notEmpty", Arrays.asList("notEmpty"), Collections.emptyList(), new IllegalArgumentException("Key column names cannot be null or empty")); + } + + @Test + public void testGetUpsertStatement() throws Exception { + // GIVEN + String tableName = "table"; + List<String> columnNames = Arrays.asList("column1","column2", "column3", "column4"); + Collection<String> uniqueKeyColumnNames = Arrays.asList("column2","column4"); + + String expected = "MERGE INTO table USING (SELECT ? column1, ? column2, ? column3, ? column4 FROM DUAL) n" + + " ON (table.column2 = n.column2 AND table.column4 = n.column4) WHEN NOT MATCHED THEN" + + " INSERT (column1, column2, column3, column4) VALUES (n.column1, n.column2, n.column3, n.column4)" + + " WHEN MATCHED THEN UPDATE SET table.column1 = n.column1, table.column3 = n.column3"; + + // WHEN + // THEN + testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, expected); + } + + private void testGetUpsertStatement(String tableName, List<String> columnNames, Collection<String> uniqueKeyColumnNames, IllegalArgumentException expected) { + try { + testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, (String)null); + fail(); + } catch (IllegalArgumentException e) { + assertEquals(expected.getMessage(), e.getMessage()); + } + } + + private void testGetUpsertStatement(String tableName, List<String> columnNames, Collection<String> uniqueKeyColumnNames, String expected) { + // WHEN + String actual = db.getUpsertStatement(tableName, columnNames, uniqueKeyColumnNames); + + // THEN + assertEquals(expected, actual); + } + } \ No newline at end of file