This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 42bd5243bb NIFI-12887 Added Binary String Format property to 
PutDatabaseRecord
42bd5243bb is described below

commit 42bd5243bb3e32b21559f628126d1795e11c5c30
Author: tpalfy <tpa...@apache.org>
AuthorDate: Tue Mar 12 13:33:01 2024 +0100

    NIFI-12887 Added Binary String Format property to PutDatabaseRecord
    
    - Supports handling Strings as hexadecimal character sequences or 
base64-encoded binary data when inserting into a binary type column
    
    This closes #8493
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../processors/standard/PutDatabaseRecord.java     | 43 ++++++++++-
 .../processors/standard/PutDatabaseRecordTest.java | 89 +++++++++++++++++++++-
 2 files changed, 130 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 1f60208f40..26b4281cd0 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -77,10 +77,12 @@ import java.sql.SQLTransientException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.HexFormat;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -238,6 +240,34 @@ public class PutDatabaseRecord extends AbstractProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    static final AllowableValue BINARY_STRING_FORMAT_UTF8 = new AllowableValue(
+            "UTF-8",
+            "UTF-8",
+            "String values for binary columns contain the original value as 
text via UTF-8 character encoding"
+    );
+
+    static final AllowableValue BINARY_STRING_FORMAT_HEXADECIMAL = new 
AllowableValue(
+            "Hexadecimal",
+            "Hexadecimal",
+            "String values for binary columns contain the original value in 
hexadecimal format"
+    );
+
+    static final AllowableValue BINARY_STRING_FORMAT_BASE64 = new 
AllowableValue(
+            "Base64",
+            "Base64",
+            "String values for binary columns contain the original value in 
Base64 encoded format"
+    );
+
+    static final PropertyDescriptor BINARY_STRING_FORMAT = new Builder()
+            .name("put-db-record-binary-format")
+            .displayName("Binary String Format")
+            .description("The format to be applied when decoding string values 
to binary.")
+            .required(true)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .allowableValues(BINARY_STRING_FORMAT_UTF8, 
BINARY_STRING_FORMAT_HEXADECIMAL, BINARY_STRING_FORMAT_BASE64)
+            .defaultValue(BINARY_STRING_FORMAT_UTF8.getValue())
+            .build();
+
     static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new Builder()
             .name("put-db-record-translate-field-names")
             .displayName("Translate Field Names")
@@ -388,6 +418,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
         pds.add(CATALOG_NAME);
         pds.add(SCHEMA_NAME);
         pds.add(TABLE_NAME);
+        pds.add(BINARY_STRING_FORMAT);
         pds.add(TRANSLATE_FIELD_NAMES);
         pds.add(UNMATCHED_FIELD_BEHAVIOR);
         pds.add(UNMATCHED_COLUMN_BEHAVIOR);
@@ -606,6 +637,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
         final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
         final int timeoutMillis = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
 
+        final String binaryStringFormat = 
context.getProperty(BINARY_STRING_FORMAT).evaluateAttributeExpressions(flowFile).getValue();
+
         // Ensure the table name has been set, the generated SQL statements 
(and TableSchema cache) will need it
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException(format("Cannot process %s 
because Table Name is null or empty", flowFile));
@@ -764,7 +797,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
                                             }
                                             currentValue = dest;
                                         } else if (currentValue instanceof 
String) {
-                                            currentValue = ((String) 
currentValue).getBytes(StandardCharsets.UTF_8);
+                                            final String stringValue = 
(String) currentValue;
+
+                                            if 
(BINARY_STRING_FORMAT_BASE64.getValue().equals(binaryStringFormat)) {
+                                                currentValue = 
Base64.getDecoder().decode(stringValue);
+                                            } else if 
(BINARY_STRING_FORMAT_HEXADECIMAL.getValue().equals(binaryStringFormat)) {
+                                                currentValue = 
HexFormat.of().parseHex(stringValue);
+                                            } else {
+                                                currentValue = 
stringValue.getBytes(StandardCharsets.UTF_8);
+                                            }
                                         } else if (currentValue != null && 
!(currentValue instanceof byte[])) {
                                             throw new 
IllegalTypeConversionException("Cannot convert value " + currentValue + " to 
BLOB/BINARY/VARBINARY/LONGVARBINARY");
                                         }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index e6483efc7f..70a908b377 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -61,6 +61,7 @@ import java.time.LocalDate;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1686,6 +1687,88 @@ public class PutDatabaseRecordTest {
         conn.close();
     }
 
+    @Test
+    void testInsertHexStringIntoBinary() throws Exception {
+        runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, 
PutDatabaseRecord.BINARY_STRING_FORMAT_HEXADECIMAL);
+
+        String tableName = "HEX_STRING_TEST";
+        String createTable = "CREATE TABLE " + tableName + " (id integer 
primary key, binary_data blob)";
+        String hexStringData = "abCDef";
+
+        recreateTable(tableName, createTable);
+        final MockRecordParser parser = new MockRecordParser();
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+
+        parser.addSchemaField("id", RecordFieldType.INT);
+        parser.addSchemaField("binaryData", RecordFieldType.STRING);
+
+        parser.addRecord(1, hexStringData);
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, tableName);
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
+        final Connection conn = dbcp.getConnection();
+        final Statement stmt = conn.createStatement();
+
+        final ResultSet resultSet = stmt.executeQuery("SELECT * FROM " + 
tableName);
+        assertTrue(resultSet.next());
+
+        assertEquals(1, resultSet.getInt(1));
+
+        Blob blob = resultSet.getBlob(2);
+        assertArrayEquals(new byte[]{(byte)171, (byte)205, (byte)239}, 
blob.getBytes(1, (int)blob.length()));
+
+        stmt.close();
+        conn.close();
+    }
+
+    @Test
+    void testInsertBase64StringIntoBinary() throws Exception {
+        runner.setProperty(PutDatabaseRecord.BINARY_STRING_FORMAT, 
PutDatabaseRecord.BINARY_STRING_FORMAT_BASE64);
+
+        String tableName = "BASE64_STRING_TEST";
+        String createTable = "CREATE TABLE " + tableName + " (id integer 
primary key, binary_data blob)";
+        byte[] binaryData = {(byte) 10, (byte) 103, (byte) 234};
+
+        recreateTable(tableName, createTable);
+        final MockRecordParser parser = new MockRecordParser();
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+
+        parser.addSchemaField("id", RecordFieldType.INT);
+        parser.addSchemaField("binaryData", RecordFieldType.STRING);
+
+        parser.addRecord(1, Base64.getEncoder().encodeToString(binaryData));
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, tableName);
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
+        final Connection conn = dbcp.getConnection();
+        final Statement stmt = conn.createStatement();
+
+        final ResultSet resultSet = stmt.executeQuery("SELECT * FROM " + 
tableName);
+        assertTrue(resultSet.next());
+
+        assertEquals(1, resultSet.getInt(1));
+
+        Blob blob = resultSet.getBlob(2);
+        assertArrayEquals(binaryData, blob.getBytes(1, (int)blob.length()));
+
+        stmt.close();
+        conn.close();
+    }
+
     @Test
     void testInsertWithBlobClobObjectArraySource() throws Exception {
         String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary 
key, name clob," +
@@ -1959,10 +2042,14 @@ public class PutDatabaseRecordTest {
     }
 
     private void recreateTable(String createSQL) throws ProcessException, 
SQLException {
+        recreateTable("PERSONS", createSQL);
+    }
+
+    private void recreateTable(String tableName, String createSQL) throws 
ProcessException, SQLException {
         final Connection conn = dbcp.getConnection();
         final Statement stmt = conn.createStatement();
         try {
-            stmt.execute("drop table PERSONS");
+            stmt.execute("drop table " + tableName);
         } catch (SQLException ignore) {
             // Do nothing, may not have existed
         }

Reply via email to