Repository: nifi
Updated Branches:
  refs/heads/master afb9a0016 -> e210172d9


PutSQLBinary

Signed-off-by: Matt Burgess <mattyb...@apache.org>

NIFI-2591

Signed-off-by: Matt Burgess <mattyb...@apache.org>

NIFI-2591

Signed-off-by: Matt Burgess <mattyb...@apache.org>

NIFI-2591 - Added Format option for binary data types. Updated unit tests.

Signed-off-by: Matt Burgess <mattyb...@apache.org>

This closes #883


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e210172d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e210172d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e210172d

Branch: refs/heads/master
Commit: e210172d93d6d5be45c061039bb7e0a2896c6e14
Parents: afb9a00
Author: Peter Wicks <pwi...@micron.com>
Authored: Wed Aug 17 09:06:09 2016 -0600
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Thu Aug 25 08:41:23 2016 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/PutSQL.java | 180 +++++++++++--------
 .../nifi/processors/standard/TestPutSQL.java    | 161 ++++++++++++++++-
 2 files changed, 267 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e210172d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index 910112e..8e87c56 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -39,8 +39,11 @@ import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.StreamUtils;
 
+import javax.xml.bind.DatatypeConverter;
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.sql.BatchUpdateException;
@@ -76,78 +79,84 @@ import java.util.regex.Pattern;
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"})
 @CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content 
of an incoming FlowFile is expected to be the SQL command "
-    + "to execute. The SQL command may use the ? to escape parameters. In this 
case, the parameters to use must exist as FlowFile attributes "
-    + "with the naming convention sql.args.N.type and sql.args.N.value, where 
N is a positive integer. The sql.args.N.type is expected to be "
-    + "a number indicating the JDBC Type. The content of the FlowFile is 
expected to be in UTF-8 format.")
+        + "to execute. The SQL command may use the ? to escape parameters. In 
this case, the parameters to use must exist as FlowFile attributes "
+        + "with the naming convention sql.args.N.type and sql.args.N.value, 
where N is a positive integer. The sql.args.N.type is expected to be "
+        + "a number indicating the JDBC Type. The content of the FlowFile is 
expected to be in UTF-8 format.")
 @ReadsAttributes({
-    @ReadsAttribute(attribute = "fragment.identifier", description = "If the 
<Support Fragment Transactions> property is true, this attribute is used to 
determine whether or "
-        + "not two FlowFiles belong to the same transaction."),
-    @ReadsAttribute(attribute = "fragment.count", description = "If the 
<Support Fragment Transactions> property is true, this attribute is used to 
determine how many FlowFiles "
-        + "are needed to complete the transaction."),
-    @ReadsAttribute(attribute = "fragment.index", description = "If the 
<Support Fragment Transactions> property is true, this attribute is used to 
determine the order that the FlowFiles "
-        + "in a transaction should be evaluated."),
-    @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming 
FlowFiles are expected to be parameterized SQL statements. The type of each 
Parameter is specified as an integer "
-        + "that represents the JDBC Type of the parameter."),
-    @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming 
FlowFiles are expected to be parameterized SQL statements. The value of the 
Parameters are specified as "
-        + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. 
The type of the sql.args.1.value Parameter is specified by the sql.args.1.type 
attribute.")
+        @ReadsAttribute(attribute = "fragment.identifier", description = "If 
the <Support Fragment Transactions> property is true, this attribute is used to 
determine whether or "
+                + "not two FlowFiles belong to the same transaction."),
+        @ReadsAttribute(attribute = "fragment.count", description = "If the 
<Support Fragment Transactions> property is true, this attribute is used to 
determine how many FlowFiles "
+                + "are needed to complete the transaction."),
+        @ReadsAttribute(attribute = "fragment.index", description = "If the 
<Support Fragment Transactions> property is true, this attribute is used to 
determine the order that the FlowFiles "
+                + "in a transaction should be evaluated."),
+        @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming 
FlowFiles are expected to be parameterized SQL statements. The type of each 
Parameter is specified as an integer "
+                + "that represents the JDBC Type of the parameter."),
+        @ReadsAttribute(attribute = "sql.args.N.value", description = 
"Incoming FlowFiles are expected to be parameterized SQL statements. The value 
of the Parameters are specified as "
+                + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and 
so on. The type of the sql.args.1.value Parameter is specified by the 
sql.args.1.type attribute."),
+        @ReadsAttribute(attribute = "sql.args.N.format", description = "This 
attribute is always optional, but default options may not always work for your 
data. "
+                + "Incoming FlowFiles are expected to be parameterized SQL 
statements. In some cases "
+                + "a format option needs to be specified, currently this is 
only applicable for binary data types. For binary data types "
+                + "available options are 'ascii', 'base64' and 'hex'.  In 
'ascii' format each string character in your attribute value represents a 
single byte, this is the default format "
+                + "and the format provided by Avro Processors. In 'base64' 
format your string is a Base64 encoded string.  In 'hex' format the string is 
hex encoded with all "
+                + "letters in upper case and no '0x' at the beginning.")
 })
 @WritesAttributes({
-    @WritesAttribute(attribute = "sql.generated.key", description = "If the 
database generated a key for an INSERT statement and the Obtain Generated Keys 
property is set to true, "
-        + "this attribute will be added to indicate the generated key, if 
possible. This feature is not supported by all database vendors.")
+        @WritesAttribute(attribute = "sql.generated.key", description = "If 
the database generated a key for an INSERT statement and the Obtain Generated 
Keys property is set to true, "
+                + "this attribute will be added to indicate the generated key, 
if possible. This feature is not supported by all database vendors.")
 })
 public class PutSQL extends AbstractProcessor {
 
     static final PropertyDescriptor CONNECTION_POOL = new 
PropertyDescriptor.Builder()
-        .name("JDBC Connection Pool")
-        .description("Specifies the JDBC Connection Pool to use in order to 
convert the JSON message to a SQL statement. "
-            + "The Connection Pool is necessary in order to determine the 
appropriate database column types.")
-        .identifiesControllerService(DBCPService.class)
-        .required(true)
-        .build();
+            .name("JDBC Connection Pool")
+            .description("Specifies the JDBC Connection Pool to use in order 
to convert the JSON message to a SQL statement. "
+                    + "The Connection Pool is necessary in order to determine 
the appropriate database column types.")
+            .identifiesControllerService(DBCPService.class)
+            .required(true)
+            .build();
     static final PropertyDescriptor SUPPORT_TRANSACTIONS = new 
PropertyDescriptor.Builder()
-        .name("Support Fragmented Transactions")
-        .description("If true, when a FlowFile is consumed by this Processor, 
the Processor will first check the fragment.identifier and fragment.count 
attributes of that FlowFile. "
-            + "If the fragment.count value is greater than 1, the Processor 
will not process any FlowFile will that fragment.identifier until all are 
available; "
-            + "at that point, it will process all FlowFiles with that 
fragment.identifier as a single transaction, in the order specified by the 
FlowFiles' fragment.index attributes. "
-            + "This Provides atomicity of those SQL statements. If this value 
is false, these attributes will be ignored and the updates will occur 
independent of one another.")
-        .allowableValues("true", "false")
-        .defaultValue("true")
-        .build();
+            .name("Support Fragmented Transactions")
+            .description("If true, when a FlowFile is consumed by this 
Processor, the Processor will first check the fragment.identifier and 
fragment.count attributes of that FlowFile. "
+                    + "If the fragment.count value is greater than 1, the 
Processor will not process any FlowFile will that fragment.identifier until all 
are available; "
+                    + "at that point, it will process all FlowFiles with that 
fragment.identifier as a single transaction, in the order specified by the 
FlowFiles' fragment.index attributes. "
+                    + "This Provides atomicity of those SQL statements. If 
this value is false, these attributes will be ignored and the updates will 
occur independent of one another.")
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
     static final PropertyDescriptor TRANSACTION_TIMEOUT = new 
PropertyDescriptor.Builder()
-        .name("Transaction Timeout")
-        .description("If the <Support Fragmented Transactions> property is set 
to true, specifies how long to wait for all FlowFiles for a particular 
fragment.identifier attribute "
-            + "to arrive before just transferring all of the FlowFiles with 
that identifier to the 'failure' relationship")
-        .required(false)
-        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-        .build();
+            .name("Transaction Timeout")
+            .description("If the <Support Fragmented Transactions> property is 
set to true, specifies how long to wait for all FlowFiles for a particular 
fragment.identifier attribute "
+                    + "to arrive before just transferring all of the FlowFiles 
with that identifier to the 'failure' relationship")
+            .required(false)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
     static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
-        .name("Batch Size")
-        .description("The preferred number of FlowFiles to put to the database 
in a single transaction")
-        .required(true)
-        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-        .defaultValue("100")
-        .build();
+            .name("Batch Size")
+            .description("The preferred number of FlowFiles to put to the 
database in a single transaction")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("100")
+            .build();
     static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new 
PropertyDescriptor.Builder()
-        .name("Obtain Generated Keys")
-        .description("If true, any key that is automatically generated by the 
database will be added to the FlowFile that generated it using the 
sql.generate.key attribute. "
-            + "This may result in slightly slower performance and is not 
supported by all databases.")
-        .allowableValues("true", "false")
-        .defaultValue("false")
-        .build();
+            .name("Obtain Generated Keys")
+            .description("If true, any key that is automatically generated by 
the database will be added to the FlowFile that generated it using the 
sql.generate.key attribute. "
+                    + "This may result in slightly slower performance and is 
not supported by all databases.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("A FlowFile is routed to this relationship after the 
database is successfully updated")
-        .build();
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the 
database is successfully updated")
+            .build();
     static final Relationship REL_RETRY = new Relationship.Builder()
-        .name("retry")
-        .description("A FlowFile is routed to this relationship if the 
database cannot be updated but attempting the operation again may succeed")
-        .build();
+            .name("retry")
+            .description("A FlowFile is routed to this relationship if the 
database cannot be updated but attempting the operation again may succeed")
+            .build();
     static final Relationship REL_FAILURE = new Relationship.Builder()
-        .name("failure")
-        .description("A FlowFile is routed to this relationship if the 
database cannot be updated and retrying the operation will also fail, "
-            + "such as an invalid query or an integrity constraint violation")
-        .build();
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the 
database cannot be updated and retrying the operation will also fail, "
+                    + "such as an invalid query or an integrity constraint 
violation")
+            .build();
 
     private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = 
Pattern.compile("sql\\.args\\.(\\d+)\\.type");
     private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
@@ -294,7 +303,7 @@ public class PutSQL extends AbstractProcessor {
                             conn.rollback();
                             final FlowFile offendingFlowFile = 
batchFlowFiles.get(offendingFlowFileIndex);
                             getLogger().error("Failed to update database due 
to a failed batch update. A total of {} FlowFiles are required for this 
transaction, so routing all to failure. "
-                                + "Offending FlowFile was {}, which caused the 
following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e});
+                                    + "Offending FlowFile was {}, which caused 
the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, 
e});
                             session.transfer(flowFiles, REL_FAILURE);
                             return;
                         }
@@ -337,7 +346,7 @@ public class PutSQL extends AbstractProcessor {
                         }
 
                         getLogger().error("Failed to update database due to a 
failed batch update. There were a total of {} FlowFiles that failed, {} that 
succeeded, "
-                            + "and {} that were not execute and will be routed 
to retry; ", new Object[] {failureCount, successCount, retryCount});
+                                + "and {} that were not execute and will be 
routed to retry; ", new Object[] {failureCount, successCount, retryCount});
                     } catch (final SQLNonTransientException e) {
                         getLogger().error("Failed to update database for {} 
due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e});
 
@@ -347,7 +356,7 @@ public class PutSQL extends AbstractProcessor {
                         continue;
                     } catch (final SQLException e) {
                         getLogger().error("Failed to update database for {} 
due to {}; it is possible that retrying the operation will succeed, so routing 
to retry",
-                            new Object[] {enclosure.getFlowFiles(), e});
+                                new Object[] {enclosure.getFlowFiles(), e});
 
                         for (final FlowFile flowFile : 
enclosure.getFlowFiles()) {
                             destinationRelationships.put(flowFile, REL_RETRY);
@@ -522,7 +531,7 @@ public class PutSQL extends AbstractProcessor {
      * @throws SQLException if unable to create the appropriate 
PreparedStatement
      */
     private StatementFlowFileEnclosure getEnclosure(final String sql, final 
Connection conn, final Map<String, StatementFlowFileEnclosure> stmtMap,
-        final boolean obtainKeys, final boolean fragmentedTransaction) throws 
SQLException {
+                                                    final boolean obtainKeys, 
final boolean fragmentedTransaction) throws SQLException {
         StatementFlowFileEnclosure enclosure = stmtMap.get(sql);
         if (enclosure != null) {
             return enclosure;
@@ -610,13 +619,17 @@ public class PutSQL extends AbstractProcessor {
                 final int jdbcType = Integer.parseInt(entry.getValue());
                 final String valueAttrName = "sql.args." + parameterIndex + 
".value";
                 final String parameterValue = attributes.get(valueAttrName);
+                final String formatAttrName = "sql.args." + parameterIndex + 
".format";
+                final String parameterFormat = 
attributes.containsKey(formatAttrName)? attributes.get(formatAttrName):"";
 
                 try {
-                    setParameter(stmt, valueAttrName, parameterIndex, 
parameterValue, jdbcType);
+                    setParameter(stmt, valueAttrName, parameterIndex, 
parameterValue, jdbcType, parameterFormat);
                 } catch (final NumberFormatException nfe) {
                     throw new ProcessException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted into 
the necessary data type", nfe);
                 } catch (ParseException pe) {
                     throw new ProcessException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted to a 
timestamp", pe);
+                } catch (UnsupportedEncodingException uee) {
+                    throw new ProcessException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted to 
UTF-8", uee);
                 }
             }
         }
@@ -644,7 +657,7 @@ public class PutSQL extends AbstractProcessor {
                 return null;
             } else if (fragmentCount == null) {
                 getLogger().error("Cannot process {} because there are {} 
FlowFiles with the same fragment.identifier "
-                    + "attribute but not all FlowFiles have a fragment.count 
attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()});
+                        + "attribute but not all FlowFiles have a 
fragment.count attribute; routing all to failure", new Object[] {flowFile, 
flowFiles.size()});
                 return REL_FAILURE;
             }
 
@@ -653,13 +666,13 @@ public class PutSQL extends AbstractProcessor {
                 numFragments = Integer.parseInt(fragmentCount);
             } catch (final NumberFormatException nfe) {
                 getLogger().error("Cannot process {} because the 
fragment.count attribute has a value of '{}', which is not an integer; "
-                    + "routing all FlowFiles with this fragment.identifier to 
failure", new Object[] {flowFile, fragmentCount});
+                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentCount});
                 return REL_FAILURE;
             }
 
             if (numFragments < 1) {
                 getLogger().error("Cannot process {} because the 
fragment.count attribute has a value of '{}', which is not a positive integer; "
-                    + "routing all FlowFiles with this fragment.identifier to 
failure", new Object[] {flowFile, fragmentCount});
+                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentCount});
                 return REL_FAILURE;
             }
 
@@ -667,14 +680,14 @@ public class PutSQL extends AbstractProcessor {
                 selectedNumFragments = numFragments;
             } else if (numFragments != selectedNumFragments) {
                 getLogger().error("Cannot process {} because the 
fragment.count attribute has different values for different FlowFiles with the 
same fragment.identifier; "
-                    + "routing all FlowFiles with this fragment.identifier to 
failure", new Object[] {flowFile});
+                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile});
                 return REL_FAILURE;
             }
 
             final String fragmentIndex = 
flowFile.getAttribute(FRAGMENT_INDEX_ATTR);
             if (fragmentIndex == null) {
                 getLogger().error("Cannot process {} because the 
fragment.index attribute is missing; "
-                    + "routing all FlowFiles with this fragment.identifier to 
failure", new Object[] {flowFile});
+                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile});
                 return REL_FAILURE;
             }
 
@@ -683,19 +696,19 @@ public class PutSQL extends AbstractProcessor {
                 idx = Integer.parseInt(fragmentIndex);
             } catch (final NumberFormatException nfe) {
                 getLogger().error("Cannot process {} because the 
fragment.index attribute has a value of '{}', which is not an integer; "
-                    + "routing all FlowFiles with this fragment.identifier to 
failure", new Object[] {flowFile, fragmentIndex});
+                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentIndex});
                 return REL_FAILURE;
             }
 
             if (idx < 0) {
                 getLogger().error("Cannot process {} because the 
fragment.index attribute has a value of '{}', which is not a positive integer; "
-                    + "routing all FlowFiles with this fragment.identifier to 
failure", new Object[] {flowFile, fragmentIndex});
+                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile, fragmentIndex});
                 return REL_FAILURE;
             }
 
             if (bitSet.get(idx)) {
                 getLogger().error("Cannot process {} because it has the same 
value for the fragment.index attribute as another FlowFile with the same 
fragment.identifier; "
-                    + "routing all FlowFiles with this fragment.identifier to 
failure", new Object[] {flowFile});
+                        + "routing all FlowFiles with this fragment.identifier 
to failure", new Object[] {flowFile});
                 return REL_FAILURE;
             }
 
@@ -735,7 +748,9 @@ public class PutSQL extends AbstractProcessor {
      * @param jdbcType the JDBC Type of the SQL parameter to set
      * @throws SQLException if the PreparedStatement throws a SQLException 
when calling the appropriate setter
      */
-    private void setParameter(final PreparedStatement stmt, final String 
attrName, final int parameterIndex, final String parameterValue, final int 
jdbcType) throws SQLException, ParseException {
+    private void setParameter(final PreparedStatement stmt, final String 
attrName, final int parameterIndex, final String parameterValue, final int 
jdbcType,
+                              final String valueFormat)
+            throws SQLException, ParseException, UnsupportedEncodingException {
         if (parameterValue == null) {
             stmt.setNull(parameterIndex, jdbcType);
         } else {
@@ -787,6 +802,29 @@ public class PutSQL extends AbstractProcessor {
                     stmt.setTimestamp(parameterIndex, new 
Timestamp(lTimestamp));
 
                     break;
+                case Types.BINARY:
+                case Types.VARBINARY:
+                case Types.LONGVARBINARY:
+                    byte[] bValue;
+
+                    switch(valueFormat){
+                        case "":
+                        case "ascii":
+                            bValue = parameterValue.getBytes("ASCII");
+                            break;
+                        case "hex":
+                            bValue = 
DatatypeConverter.parseHexBinary(parameterValue);
+                            break;
+                        case "base64":
+                            bValue = 
DatatypeConverter.parseBase64Binary(parameterValue);
+                            break;
+                        default:
+                            throw new ParseException("Unable to parse binary 
data using the formatter `" + valueFormat + "`.",0);
+                    }
+
+                    stmt.setBinaryStream(parameterIndex, new 
ByteArrayInputStream(bValue), bValue.length);
+
+                    break;
                 case Types.CHAR:
                 case Types.VARCHAR:
                 case Types.LONGNVARCHAR:

http://git-wip-us.apache.org/repos/asf/nifi/blob/e210172d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
index 533961c..321bac7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
@@ -30,9 +31,11 @@ import java.sql.Statement;
 import java.sql.Types;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -46,6 +49,8 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 
+import javax.xml.bind.DatatypeConverter;
+
 public class TestPutSQL {
     private static final String createPersons = "CREATE TABLE PERSONS (id 
integer primary key, name varchar(100), code integer)";
     private static final String createPersonsAutoId = "CREATE TABLE PERSONS_AI 
(id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1), name 
VARCHAR(100), code INTEGER check(code <= 100))";
@@ -305,6 +310,136 @@ public class TestPutSQL {
     }
 
     @Test
+    public void testBinaryColumnTypes() throws InitializationException, 
ProcessException, SQLException, IOException, ParseException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate("CREATE TABLE BINARYTESTS (id integer 
primary key, bn1 CHAR(8) FOR BIT DATA, bn2 VARCHAR(100) FOR BIT DATA, " +
+                        "bn3 LONG VARCHAR FOR BIT DATA)");
+            }
+        }
+
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+
+        final byte[] insertStatement = "INSERT INTO BINARYTESTS (ID, bn1, bn2, 
bn3) VALUES (?, ?, ?, ?)".getBytes();
+
+        final String arg2BIN = fixedSizeByteArrayAsASCIIString(8);
+        final String art3VARBIN = fixedSizeByteArrayAsASCIIString(50);
+        final String art4LongBin = fixedSizeByteArrayAsASCIIString(32700); 
//max size supported by Derby
+
+        //ASCII (default) binary formatn
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.1.value", "1");
+        attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
+        attributes.put("sql.args.2.value", arg2BIN);
+        attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
+        attributes.put("sql.args.3.value", art3VARBIN);
+        attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
+        attributes.put("sql.args.4.value", art4LongBin);
+
+        runner.enqueue(insertStatement, attributes);
+
+        //ASCII with specified format
+        attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.1.value", "2");
+        attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
+        attributes.put("sql.args.2.value", arg2BIN);
+        attributes.put("sql.args.2.format", "ascii");
+        attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
+        attributes.put("sql.args.3.value", art3VARBIN);
+        attributes.put("sql.args.3.format", "ascii");
+        attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
+        attributes.put("sql.args.4.value", art4LongBin);
+        attributes.put("sql.args.4.format", "ascii");
+
+        runner.enqueue(insertStatement, attributes);
+
+        //Hex
+        final String arg2HexBIN = fixedSizeByteArrayAsHexString(8);
+        final String art3HexVARBIN = fixedSizeByteArrayAsHexString(50);
+        final String art4HexLongBin = fixedSizeByteArrayAsHexString(32700);
+
+        attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.1.value", "3");
+        attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
+        attributes.put("sql.args.2.value", arg2HexBIN);
+        attributes.put("sql.args.2.format", "hex");
+        attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
+        attributes.put("sql.args.3.value", art3HexVARBIN);
+        attributes.put("sql.args.3.format", "hex");
+        attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
+        attributes.put("sql.args.4.value", art4HexLongBin);
+        attributes.put("sql.args.4.format", "hex");
+
+        runner.enqueue(insertStatement, attributes);
+
+        //Base64
+        final String arg2Base64BIN = fixedSizeByteArrayAsBase64String(8);
+        final String art3Base64VARBIN = fixedSizeByteArrayAsBase64String(50);
+        final String art4Base64LongBin = 
fixedSizeByteArrayAsBase64String(32700);
+
+        attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        attributes.put("sql.args.1.value", "4");
+        attributes.put("sql.args.2.type", String.valueOf(Types.BINARY));
+        attributes.put("sql.args.2.value", arg2Base64BIN);
+        attributes.put("sql.args.2.format", "base64");
+        attributes.put("sql.args.3.type", String.valueOf(Types.VARBINARY));
+        attributes.put("sql.args.3.value", art3Base64VARBIN);
+        attributes.put("sql.args.3.format", "base64");
+        attributes.put("sql.args.4.type", String.valueOf(Types.LONGVARBINARY));
+        attributes.put("sql.args.4.value", art4Base64LongBin);
+        attributes.put("sql.args.4.format", "base64");
+
+        runner.enqueue(insertStatement, attributes);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 4);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
BINARYTESTS");
+
+                //First Batch
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertTrue(Arrays.equals(arg2BIN.getBytes("ASCII"), 
rs.getBytes(2)));
+                assertTrue(Arrays.equals(art3VARBIN.getBytes("ASCII"), 
rs.getBytes(3)));
+                assertTrue(Arrays.equals(art4LongBin.getBytes("ASCII"), 
rs.getBytes(4)));
+
+                //Second batch
+                assertTrue(rs.next());
+                assertEquals(2, rs.getInt(1));
+                assertTrue(Arrays.equals(arg2BIN.getBytes("ASCII"), 
rs.getBytes(2)));
+                assertTrue(Arrays.equals(art3VARBIN.getBytes("ASCII"), 
rs.getBytes(3)));
+                assertTrue(Arrays.equals(art4LongBin.getBytes("ASCII"), 
rs.getBytes(4)));
+
+                //Third Batch (Hex)
+                assertTrue(rs.next());
+                assertEquals(3, rs.getInt(1));
+                
assertTrue(Arrays.equals(DatatypeConverter.parseHexBinary(arg2HexBIN), 
rs.getBytes(2)));
+                
assertTrue(Arrays.equals(DatatypeConverter.parseHexBinary(art3HexVARBIN), 
rs.getBytes(3)));
+                
assertTrue(Arrays.equals(DatatypeConverter.parseHexBinary(art4HexLongBin), 
rs.getBytes(4)));
+
+                //Fourth Batch (Base64)
+                assertTrue(rs.next());
+                assertEquals(4, rs.getInt(1));
+                
assertTrue(Arrays.equals(DatatypeConverter.parseBase64Binary(arg2Base64BIN), 
rs.getBytes(2)));
+                
assertTrue(Arrays.equals(DatatypeConverter.parseBase64Binary(art3Base64VARBIN), 
rs.getBytes(3)));
+                
assertTrue(Arrays.equals(DatatypeConverter.parseBase64Binary(art4Base64LongBin),
 rs.getBytes(4)));
+
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
     public void testStatementsWithPreparedParameters() throws 
InitializationException, ProcessException, SQLException, IOException {
         final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
         runner.addControllerService("dbcp", service);
@@ -373,7 +508,7 @@ public class TestPutSQL {
         runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
 
         final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
-            "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
         attributes.put("sql.args.1.value", "1");
@@ -444,7 +579,7 @@ public class TestPutSQL {
         recreateTable("PERSONS", createPersons);
 
         final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
-            "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
+                "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
         attributes.put("sql.args.1.value", "1");
@@ -483,7 +618,7 @@ public class TestPutSQL {
         runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
 
         final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, 
?); " +
-            "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+                "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
         attributes.put("sql.args.1.value", "1");
@@ -671,4 +806,24 @@ public class TestPutSQL {
             }
         }
     }
+
+    private String fixedSizeByteArrayAsASCIIString(int length){
+        byte[] bBinary = RandomUtils.nextBytes(length);
+        ByteBuffer bytes = ByteBuffer.wrap(bBinary);
+        StringBuffer sbBytes = new StringBuffer();
+        for (int i = bytes.position(); i < bytes.limit(); i++)
+            sbBytes.append((char)bytes.get(i));
+
+        return sbBytes.toString();
+    }
+
+    private String fixedSizeByteArrayAsHexString(int length){
+        byte[] bBinary = RandomUtils.nextBytes(length);
+        return DatatypeConverter.printHexBinary(bBinary);
+    }
+
+    private String fixedSizeByteArrayAsBase64String(int length){
+        byte[] bBinary = RandomUtils.nextBytes(length);
+        return DatatypeConverter.printBase64Binary(bBinary);
+    }
 }

Reply via email to