This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d4344a3140 NIFI-13642 Added Delete Keys Property to PutDatabaseRecord
d4344a3140 is described below

commit d4344a3140722cb69202f8b58f190127471d5d30
Author: Rajmund Takacs <tak...@gmail.com>
AuthorDate: Tue Aug 6 16:48:12 2024 +0200

    NIFI-13642 Added Delete Keys Property to PutDatabaseRecord
    
    - Delete Keys property enables targeted deletes for databases that do not 
support primary keys
    
    This closes #9162
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../processors/standard/PutDatabaseRecord.java     | 102 +++++++++++++--------
 .../processors/standard/PutDatabaseRecordTest.java |   8 +-
 2 files changed, 70 insertions(+), 40 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 541eca9613..a3cfd0c375 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -16,8 +16,43 @@
  */
 package org.apache.nifi.processors.standard;
 
+import static java.lang.String.format;
+import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT;
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.BatchUpdateException;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.SQLDataException;
+import java.sql.SQLException;
+import java.sql.SQLIntegrityConstraintViolationException;
+import java.sql.SQLTransientException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.HexFormat;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -60,40 +95,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.sql.BatchUpdateException;
-import java.sql.Clob;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.PreparedStatement;
-import java.sql.SQLDataException;
-import java.sql.SQLException;
-import java.sql.SQLIntegrityConstraintViolationException;
-import java.sql.SQLTransientException;
-import java.sql.Statement;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Base64;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.HexFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-
-import static java.lang.String.format;
-import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT;
-import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
-import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
-
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"sql", "record", "jdbc", "put", "database", "update", "insert", 
"delete"})
 @CapabilityDescription("The PutDatabaseRecord processor uses a specified 
RecordReader to input (possibly multiple) records from an incoming flow file. 
These records are translated to SQL "
@@ -306,6 +307,17 @@ public class PutDatabaseRecord extends AbstractProcessor {
             .dependsOn(STATEMENT_TYPE, UPDATE_TYPE, UPSERT_TYPE, SQL_TYPE, 
USE_ATTR_TYPE, USE_RECORD_PATH)
             .build();
 
+    static final PropertyDescriptor DELETE_KEYS = new Builder()
+            .name("Delete Keys")
+            .description("A comma-separated list of column names that uniquely 
identifies a row in the database for DELETE statements. "
+                    + "If the Statement Type is DELETE and this property is 
not set, the table's columns are used. "
+                    + "This property is ignored if the Statement Type is not 
DELETE")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .dependsOn(STATEMENT_TYPE, DELETE_TYPE, SQL_TYPE, USE_ATTR_TYPE, 
USE_RECORD_PATH)
+            .build();
+
     static final PropertyDescriptor FIELD_CONTAINING_SQL = new Builder()
             .name("put-db-record-field-containing-sql")
             .displayName("Field Containing SQL")
@@ -427,6 +439,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 UNMATCHED_FIELD_BEHAVIOR,
                 UNMATCHED_COLUMN_BEHAVIOR,
                 UPDATE_KEYS,
+                DELETE_KEYS,
                 FIELD_CONTAINING_SQL,
                 ALLOW_MULTIPLE_STATEMENTS,
                 QUOTE_IDENTIFIERS,
@@ -730,6 +743,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
         final String schemaName = 
context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
         final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
         final String updateKeys = 
context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
+        final String deleteKeys = 
context.getProperty(DELETE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
         final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
         final int timeoutMillis = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
 
@@ -795,7 +809,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                         } else if 
(UPDATE_TYPE.equalsIgnoreCase(statementType)) {
                             sqlHolder = generateUpdate(recordSchema, 
fqTableName, updateKeys, tableSchema, settings);
                         } else if 
(DELETE_TYPE.equalsIgnoreCase(statementType)) {
-                            sqlHolder = generateDelete(recordSchema, 
fqTableName, tableSchema, settings);
+                            sqlHolder = generateDelete(recordSchema, 
fqTableName, deleteKeys, tableSchema, settings);
                         } else if 
(UPSERT_TYPE.equalsIgnoreCase(statementType)) {
                             sqlHolder = generateUpsert(recordSchema, 
fqTableName, updateKeys, tableSchema, settings);
                         } else if 
(INSERT_IGNORE_TYPE.equalsIgnoreCase(statementType)) {
@@ -1443,7 +1457,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
         return new SqlAndIncludedColumns(sqlBuilder.toString(), 
includedColumns);
     }
 
-    SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, 
final String tableName, final TableSchema tableSchema, final DMLSettings 
settings)
+    SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, 
final String tableName, String deleteKeys, final TableSchema tableSchema, final 
DMLSettings settings)
             throws IllegalArgumentException, MalformedRecordException, 
SQLDataException {
 
         final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
@@ -1465,14 +1479,28 @@ public class PutDatabaseRecord extends 
AbstractProcessor {
         sqlBuilder.append(tableName);
 
         // iterate over all of the fields in the record, building the SQL 
statement by adding the column names
-        List<String> fieldNames = recordSchema.getFieldNames();
+        final List<String> fieldNames = recordSchema.getFieldNames();
         final List<Integer> includedColumns = new ArrayList<>();
         if (fieldNames != null) {
             sqlBuilder.append(" WHERE ");
             int fieldCount = fieldNames.size();
             AtomicInteger fieldsFound = new AtomicInteger(0);
 
+            // If 'deleteKeys' is not specified by the user, then all columns 
of the table
+            // should be used in the 'WHERE' clause, in order to keep the 
original behavior.
+            final Set<String> deleteKeysSet;
+            if (deleteKeys == null) {
+                deleteKeysSet = new HashSet<>(fieldNames);
+            } else {
+                deleteKeysSet = Arrays.stream(deleteKeys.split(","))
+                        .map(String::trim)
+                        .collect(Collectors.toSet());
+            }
+
             for (int i = 0; i < fieldCount; i++) {
+                if (!deleteKeysSet.contains(fieldNames.get(i))) {
+                    continue; // skip this field if it should not be included 
in 'WHERE'
+                }
 
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index b03197a731..307c230ced 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -405,7 +405,9 @@ public class PutDatabaseRecordTest {
         assertEquals("UPDATE PERSONS SET name = ?, code = ? WHERE id = ?",
                 processor.generateUpdate(schema, "PERSONS", null, tableSchema, 
settings).getSql());
         assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (name = ? OR 
(name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))",
-                processor.generateDelete(schema, "PERSONS", tableSchema, 
settings).getSql());
+                processor.generateDelete(schema, "PERSONS", null, tableSchema, 
settings).getSql());
+        assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (code = ? OR 
(code is null AND ? is null))",
+                processor.generateDelete(schema, "PERSONS", "id, code", 
tableSchema, settings).getSql());
     }
 
     @Test
@@ -450,7 +452,7 @@ public class PutDatabaseRecordTest {
         assertEquals("Cannot map field 'non_existing' to any column in the 
database\nColumns: id,name,code", e.getMessage());
 
         e = assertThrows(SQLDataException.class,
-                () -> processor.generateDelete(schema, "PERSONS", tableSchema, 
settings),
+                () -> processor.generateDelete(schema, "PERSONS", null, 
tableSchema, settings),
                 "generateDelete should fail with unmatched fields");
         assertEquals("Cannot map field 'non_existing' to any column in the 
database\nColumns: id,name,code", e.getMessage());
     }
@@ -2495,4 +2497,4 @@ public class PutDatabaseRecordTest {
                     "; batchSize=" + String.valueOf(batchSize);
         }
     }
-}
\ No newline at end of file
+}

Reply via email to