This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new d4344a3140 NIFI-13642 Added Delete Keys Property to PutDatabaseRecord d4344a3140 is described below commit d4344a3140722cb69202f8b58f190127471d5d30 Author: Rajmund Takacs <tak...@gmail.com> AuthorDate: Tue Aug 6 16:48:12 2024 +0200 NIFI-13642 Added Delete Keys Property to PutDatabaseRecord - Delete Keys property enables targeted deletes for databases that do not support primary keys This closes #9162 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../processors/standard/PutDatabaseRecord.java | 102 +++++++++++++-------- .../processors/standard/PutDatabaseRecordTest.java | 8 +- 2 files changed, 70 insertions(+), 40 deletions(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 541eca9613..a3cfd0c375 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -16,8 +16,43 @@ */ package org.apache.nifi.processors.standard; +import static java.lang.String.format; +import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.expression.ExpressionLanguageScope.NONE; + import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.sql.BatchUpdateException; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.SQLDataException; +import java.sql.SQLException; +import java.sql.SQLIntegrityConstraintViolationException; +import java.sql.SQLTransientException; +import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.HexFormat; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -60,40 +95,6 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.sql.BatchUpdateException; -import java.sql.Clob; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.PreparedStatement; -import java.sql.SQLDataException; -import java.sql.SQLException; -import java.sql.SQLIntegrityConstraintViolationException; -import java.sql.SQLTransientException; -import java.sql.Statement; -import java.sql.Types; -import java.util.ArrayList; -import java.util.Base64; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.HexFormat; -import java.util.List; -import java.util.Map; -import java.util.ServiceLoader; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; - -import static java.lang.String.format; -import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT; -import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; -import static org.apache.nifi.expression.ExpressionLanguageScope.NONE; - @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"sql", "record", "jdbc", "put", "database", "update", "insert", "delete"}) @CapabilityDescription("The PutDatabaseRecord processor uses a specified RecordReader to input (possibly multiple) records from an incoming flow file. These records are translated to SQL " @@ -306,6 +307,17 @@ public class PutDatabaseRecord extends AbstractProcessor { .dependsOn(STATEMENT_TYPE, UPDATE_TYPE, UPSERT_TYPE, SQL_TYPE, USE_ATTR_TYPE, USE_RECORD_PATH) .build(); + static final PropertyDescriptor DELETE_KEYS = new Builder() + .name("Delete Keys") + .description("A comma-separated list of column names that uniquely identifies a row in the database for DELETE statements. " + + "If the Statement Type is DELETE and this property is not set, the table's columns are used. " + + "This property is ignored if the Statement Type is not DELETE") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .dependsOn(STATEMENT_TYPE, DELETE_TYPE, SQL_TYPE, USE_ATTR_TYPE, USE_RECORD_PATH) + .build(); + static final PropertyDescriptor FIELD_CONTAINING_SQL = new Builder() .name("put-db-record-field-containing-sql") .displayName("Field Containing SQL") @@ -427,6 +439,7 @@ public class PutDatabaseRecord extends AbstractProcessor { UNMATCHED_FIELD_BEHAVIOR, UNMATCHED_COLUMN_BEHAVIOR, UPDATE_KEYS, + DELETE_KEYS, FIELD_CONTAINING_SQL, ALLOW_MULTIPLE_STATEMENTS, QUOTE_IDENTIFIERS, @@ -730,6 +743,7 @@ public class PutDatabaseRecord extends AbstractProcessor { final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue(); + final String deleteKeys = context.getProperty(DELETE_KEYS).evaluateAttributeExpressions(flowFile).getValue(); final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger(); final int timeoutMillis = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(); @@ -795,7 +809,7 @@ public class PutDatabaseRecord extends AbstractProcessor { } else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) { sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings); } else if (DELETE_TYPE.equalsIgnoreCase(statementType)) { - sqlHolder = generateDelete(recordSchema, fqTableName, tableSchema, settings); + sqlHolder = generateDelete(recordSchema, fqTableName, deleteKeys, tableSchema, settings); } else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) { sqlHolder = generateUpsert(recordSchema, fqTableName, updateKeys, tableSchema, settings); } else if (INSERT_IGNORE_TYPE.equalsIgnoreCase(statementType)) { @@ -1443,7 +1457,7 @@ public class PutDatabaseRecord extends AbstractProcessor { return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns); } - SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, final String tableName, final TableSchema tableSchema, final DMLSettings settings) + SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, final String tableName, String deleteKeys, final TableSchema tableSchema, final DMLSettings settings) throws IllegalArgumentException, MalformedRecordException, SQLDataException { final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames); @@ -1465,14 +1479,28 @@ public class PutDatabaseRecord extends AbstractProcessor { sqlBuilder.append(tableName); // iterate over all of the fields in the record, building the SQL statement by adding the column names - List<String> fieldNames = recordSchema.getFieldNames(); + final List<String> fieldNames = recordSchema.getFieldNames(); final List<Integer> includedColumns = new ArrayList<>(); if (fieldNames != null) { sqlBuilder.append(" WHERE "); int fieldCount = fieldNames.size(); AtomicInteger fieldsFound = new AtomicInteger(0); + // If 'deleteKeys' is not specified by the user, then all columns of the table + // should be used in the 'WHERE' clause, in order to keep the original behavior. + final Set<String> deleteKeysSet; + if (deleteKeys == null) { + deleteKeysSet = new HashSet<>(fieldNames); + } else { + deleteKeysSet = Arrays.stream(deleteKeys.split(",")) + .map(String::trim) + .collect(Collectors.toSet()); + } + for (int i = 0; i < fieldCount; i++) { + if (!deleteKeysSet.contains(fieldNames.get(i))) { + continue; // skip this field if it should not be included in 'WHERE' + } RecordField field = recordSchema.getField(i); String fieldName = field.getFieldName(); diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java index b03197a731..307c230ced 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java @@ -405,7 +405,9 @@ public class PutDatabaseRecordTest { assertEquals("UPDATE PERSONS SET name = ?, code = ? WHERE id = ?", processor.generateUpdate(schema, "PERSONS", null, tableSchema, settings).getSql()); assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))", - processor.generateDelete(schema, "PERSONS", tableSchema, settings).getSql()); + processor.generateDelete(schema, "PERSONS", null, tableSchema, settings).getSql()); + assertEquals("DELETE FROM PERSONS WHERE (id = ?) AND (code = ? OR (code is null AND ? is null))", + processor.generateDelete(schema, "PERSONS", "id, code", tableSchema, settings).getSql()); } @Test @@ -450,7 +452,7 @@ public class PutDatabaseRecordTest { assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage()); e = assertThrows(SQLDataException.class, - () -> processor.generateDelete(schema, "PERSONS", tableSchema, settings), + () -> processor.generateDelete(schema, "PERSONS", null, tableSchema, settings), "generateDelete should fail with unmatched fields"); assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage()); } @@ -2495,4 +2497,4 @@ public class PutDatabaseRecordTest { "; batchSize=" + String.valueOf(batchSize); } } -} \ No newline at end of file +}