This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 4fa47ecc2a NIFI-5151: Add UPSERT support for Apache Phoenix
4fa47ecc2a is described below

commit 4fa47ecc2ad02fe266bcfee38b954d00e7aeddf8
Author: Lehel Boér <lehe...@hotmail.com>
AuthorDate: Thu May 18 16:52:51 2023 +0200

    NIFI-5151: Add UPSERT support for Apache Phoenix
    
    Signed-off-by: Matthew Burgess <mattyb...@apache.org>
---
 .../standard/db/impl/PhoenixDatabaseAdapter.java   |  33 ++++++
 ...che.nifi.processors.standard.db.DatabaseAdapter |   3 +-
 .../db/impl/TestPhoenixDatabaseAdapter.java        | 118 +++++++++++++++++++++
 3 files changed, 153 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
index c19efe51ca..86d51e7f38 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
@@ -23,8 +23,10 @@ import 
org.apache.nifi.processors.standard.db.DatabaseAdapter;
 import java.sql.JDBCType;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static java.sql.Types.CHAR;
 import static java.sql.Types.CLOB;
@@ -106,6 +108,37 @@ public final class PhoenixDatabaseAdapter implements 
DatabaseAdapter {
         return query.toString();
     }
 
+    @Override
+    public String getUpsertStatement(String table, List<String> columnNames, 
Collection<String> uniqueKeyColumnNames) {
+        if (org.apache.nifi.util.StringUtils.isEmpty(table)) {
+            throw new IllegalArgumentException("Table name cannot be null or 
blank");
+        }
+        if (columnNames == null || columnNames.isEmpty()) {
+            throw new IllegalArgumentException("Column names cannot be null or 
empty");
+        }
+        if (uniqueKeyColumnNames == null || uniqueKeyColumnNames.isEmpty()) {
+            throw new IllegalArgumentException("Key column names cannot be 
null or empty");
+        }
+
+        String columns = String.join(", ", columnNames);
+
+        String parameterizedUpsertValues = columnNames.stream()
+                .map(columnName -> "?")
+                .collect(Collectors.joining(", "));
+
+        StringBuilder statementStringBuilder = new StringBuilder("UPSERT INTO 
")
+                .append(table)
+                .append("(").append(columns).append(")")
+                .append(" VALUES ")
+                .append("(").append(parameterizedUpsertValues).append(")");
+        return statementStringBuilder.toString();
+    }
+
+    @Override
+    public boolean supportsUpsert() {
+        return true;
+    }
+
     @Override
     public boolean supportsCreateTableIfNotExists() {
         return true;
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
index f104782c5b..641223d21b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
@@ -18,4 +18,5 @@ 
org.apache.nifi.processors.standard.db.impl.Oracle12DatabaseAdapter
 org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter
 org.apache.nifi.processors.standard.db.impl.MSSQL2008DatabaseAdapter
 org.apache.nifi.processors.standard.db.impl.MySQLDatabaseAdapter
-org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter
\ No newline at end of file
+org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter
+org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPhoenixDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPhoenixDatabaseAdapter.java
new file mode 100644
index 0000000000..7234fe57e9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPhoenixDatabaseAdapter.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestPhoenixDatabaseAdapter {
+    private PhoenixDatabaseAdapter testSubject;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        testSubject = new PhoenixDatabaseAdapter();
+    }
+
+    @Test
+    void testSupportsUpsert() {
+        assertTrue(testSubject.supportsUpsert(), 
testSubject.getClass().getSimpleName() + " should support upsert");
+    }
+
+    @Test
+    void testGetUpsertStatementWithNullTableName() {
+        testGetUpsertStatement(null, Arrays.asList("notEmpty"), 
Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be 
null or blank"));
+    }
+
+    @Test
+    void testGetUpsertStatementWithBlankTableName() {
+        testGetUpsertStatement("", Arrays.asList("notEmpty"), 
Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be 
null or blank"));
+    }
+
+    @Test
+    void testGetUpsertStatementWithNullColumnNames() {
+        testGetUpsertStatement("notEmpty", null, Arrays.asList("notEmpty"), 
new IllegalArgumentException("Column names cannot be null or empty"));
+    }
+
+    @Test
+    void testGetUpsertStatementWithEmptyColumnNames() {
+        testGetUpsertStatement("notEmpty", Collections.emptyList(), 
Arrays.asList("notEmpty"), new IllegalArgumentException("Column names cannot be 
null or empty"));
+    }
+
+    @Test
+    void testGetUpsertStatementWithNullKeyColumnNames() {
+        testGetUpsertStatement("notEmpty", Arrays.asList("notEmpty"), null, 
new IllegalArgumentException("Key column names cannot be null or empty"));
+    }
+
+    @Test
+    void testGetUpsertStatementWithEmptyKeyColumnNames() {
+        testGetUpsertStatement("notEmpty", Arrays.asList("notEmpty"), 
Collections.emptyList(), new IllegalArgumentException("Key column names cannot 
be null or empty"));
+    }
+
+    @Test
+    void testGetUpsertStatement() {
+        // GIVEN
+        String tableName = "table";
+        List<String> columnNames = Arrays.asList("column1", "column2", 
"column3", "column4");
+        Collection<String> uniqueKeyColumnNames = Arrays.asList("column2", 
"column4");
+
+        String expected = "UPSERT INTO" +
+                " table(column1, column2, column3, column4) VALUES (?, ?, ?, 
?)";
+
+        // WHEN
+        // THEN
+        testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, 
expected);
+    }
+
+    private void testGetUpsertStatement(String tableName, List<String> 
columnNames, Collection<String> uniqueKeyColumnNames, IllegalArgumentException 
expected) {
+        final IllegalArgumentException e = 
assertThrows(IllegalArgumentException.class, () -> {
+            testGetUpsertStatement(tableName, columnNames, 
uniqueKeyColumnNames, (String) null);
+        });
+        assertEquals(expected.getMessage(), e.getMessage());
+    }
+
+    private void testGetUpsertStatement(String tableName, List<String> 
columnNames, Collection<String> uniqueKeyColumnNames, String expected) {
+        // WHEN
+        String actual = testSubject.getUpsertStatement(tableName, columnNames, 
uniqueKeyColumnNames);
+
+        // THEN
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testGetUpsertStatementQuoted() {
+        // GIVEN
+        String tableName = "\"table\"";
+        List<String> columnNames = Arrays.asList("column1", "\"column2\"", 
"column3", "column4");
+        Collection<String> uniqueKeyColumnNames = Arrays.asList("\"column2\"", 
"column4");
+
+        String expected = "UPSERT INTO" +
+                " \"table\"(column1, \"column2\", column3, column4) VALUES (?, 
?, ?, ?)";
+
+        // WHEN
+        // THEN
+        testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, 
expected);
+    }
+}

Reply via email to