This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 7475232bb09c15bad67bf1fe863d9afd7f503569
Author: Lehel <lehe...@hotmail.com>
AuthorDate: Tue Jan 17 16:15:15 2023 +0100

    NIFI-11045: Sensitive dynamic property support for parameterized queries in 
ExecuteSQL and ExecuteSQLRecord
    
    This closes #6853.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../java/org/apache/nifi/util/db/JdbcCommon.java   | 78 ++++++++++++++--------
 .../apache/nifi/util/db/SensitiveValueWrapper.java | 36 ++++++++++
 .../org/apache/nifi/util/db/TestJdbcCommon.java    | 34 +++++++++-
 .../nifi-standard-processors/pom.xml               |  9 +--
 .../processors/standard/AbstractExecuteSQL.java    | 32 ++++++---
 .../nifi/processors/standard/ExecuteSQL.java       | 61 ++++++++++++++---
 .../nifi/processors/standard/ExecuteSQLRecord.java | 47 ++++++++++++-
 7 files changed, 243 insertions(+), 54 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
index ad7471a54c..dbd18e931d 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
@@ -68,9 +68,11 @@ import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.Date;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import static java.sql.Types.ARRAY;
 import static java.sql.Types.BIGINT;
@@ -120,6 +122,7 @@ public class JdbcCommon {
     public static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
 
     public static final String MIME_TYPE_AVRO_BINARY = 
"application/avro-binary";
+    public static final String MASKED_LOG_VALUE = "MASKED VALUE";
 
     public static long convertToAvroStream(final ResultSet rs, final 
OutputStream outStream, boolean convertNames) throws SQLException, IOException {
         return convertToAvroStream(rs, outStream, null, null, convertNames);
@@ -681,32 +684,55 @@ public class JdbcCommon {
      * @throws SQLException if the PreparedStatement throws a SQLException 
when the appropriate setter is called
      */
     public static void setParameters(final PreparedStatement stmt, final 
Map<String, String> attributes) throws SQLException {
-        for (final Map.Entry<String, String> entry : attributes.entrySet()) {
-            final String key = entry.getKey();
-            final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
-            if (matcher.matches()) {
-                final int parameterIndex = Integer.parseInt(matcher.group(1));
-
-                final boolean isNumeric = 
NUMBER_PATTERN.matcher(entry.getValue()).matches();
-                if (!isNumeric) {
-                    throw new SQLDataException("Value of the " + key + " 
attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral 
type");
-                }
+        final Map<String, SensitiveValueWrapper> sensitiveValueWrapperMap = 
attributes.entrySet()
+                .stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
SensitiveValueWrapper(e.getValue(), false)));
+        setSensitiveParameters(stmt, sensitiveValueWrapperMap);
+    }
+
+    /**
+     * Sets all of the appropriate parameters on the given PreparedStatement, 
based on the given FlowFile attributes
+     * and masks sensitive values.
+     *
+     * @param stmt the statement to set the parameters on
+     * @param attributes the attributes from which to derive parameter 
indices, values, and types
+     * @throws SQLException if the PreparedStatement throws a SQLException 
when the appropriate setter is called
+     */
+    public static void setSensitiveParameters(final PreparedStatement stmt, 
final Map<String, SensitiveValueWrapper> attributes) throws SQLException {
+        for (final Map.Entry<String, SensitiveValueWrapper> entry : 
attributes.entrySet()) {
+            final String flowFileAttributeKey = entry.getKey();
+            setParameterAtIndex(stmt, attributes, flowFileAttributeKey);
+        }
+    }
 
-                final int jdbcType = Integer.parseInt(entry.getValue());
-                final String valueAttrName = "sql.args." + parameterIndex + 
".value";
-                final String parameterValue = attributes.get(valueAttrName);
-                final String formatAttrName = "sql.args." + parameterIndex + 
".format";
-                final String parameterFormat = 
attributes.containsKey(formatAttrName)? attributes.get(formatAttrName):"";
+    private static void setParameterAtIndex(final PreparedStatement stmt, 
final Map<String, SensitiveValueWrapper> attributes, final String 
flowFileAttributeKey) throws SQLException {
+        final Matcher sqlArgumentTypeMatcher = 
SQL_TYPE_ATTRIBUTE_PATTERN.matcher(flowFileAttributeKey);
+        if (sqlArgumentTypeMatcher.matches()) {
+            final int sqlArgumentIndex = 
Integer.parseInt(sqlArgumentTypeMatcher.group(1));
 
-                try {
-                    JdbcCommon.setParameter(stmt, valueAttrName, 
parameterIndex, parameterValue, jdbcType, parameterFormat);
-                } catch (final NumberFormatException nfe) {
-                    throw new SQLDataException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted into 
the necessary data type", nfe);
-                } catch (ParseException pe) {
-                    throw new SQLDataException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted to a 
timestamp", pe);
-                } catch (UnsupportedEncodingException uee) {
-                    throw new SQLDataException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted to 
UTF-8", uee);
-                }
+            final String sqlArgumentTypeAttributeName = flowFileAttributeKey;
+            final String sqlArgumentType = 
attributes.get(sqlArgumentTypeAttributeName).getValue();
+            final boolean isNumeric = 
NUMBER_PATTERN.matcher(sqlArgumentType).matches();
+            if (!isNumeric) {
+                throw new SQLDataException("Value of the " + 
sqlArgumentTypeAttributeName + " attribute is '" + sqlArgumentType + "', which 
is not a valid numeral SQL data type");
+            }
+
+            final int sqlType = Integer.parseInt(sqlArgumentType);
+            final String sqlArgumentValueAttributeName = "sql.args." + 
sqlArgumentIndex + ".value";
+            final Optional<SensitiveValueWrapper> sqlArgumentValueWrapper = 
Optional.ofNullable(attributes.get(sqlArgumentValueAttributeName));
+            final String sqlArgumentValue = 
sqlArgumentValueWrapper.map(SensitiveValueWrapper::getValue).orElse(null);
+            final String sqlArgumentLogValue = sqlArgumentValueWrapper.map(a 
-> a.isSensitive() ? MASKED_LOG_VALUE : a.getValue()).orElse(null);
+            final String sqlArgumentFormatAttributeName = "sql.args." + 
sqlArgumentIndex + ".format";
+            final String sqlArgumentFormat = 
attributes.containsKey(sqlArgumentFormatAttributeName) ? 
attributes.get(sqlArgumentFormatAttributeName).getValue() : "";
+
+            try {
+                JdbcCommon.setParameter(stmt, sqlArgumentIndex, 
sqlArgumentValue, sqlType, sqlArgumentFormat);
+            } catch (final NumberFormatException nfe) {
+                throw new SQLDataException("The value of the " + 
sqlArgumentValueAttributeName + " is '" + sqlArgumentLogValue + "', which 
cannot be converted into the necessary data type", nfe);
+            } catch (ParseException pe) {
+                throw new SQLDataException("The value of the " + 
sqlArgumentValueAttributeName + " is '" + sqlArgumentLogValue + "', which 
cannot be converted to a timestamp", pe);
+            } catch (UnsupportedEncodingException uee) {
+                throw new SQLDataException("The value of the " + 
sqlArgumentValueAttributeName + " is '" + sqlArgumentLogValue + "', which 
cannot be converted to UTF-8", uee);
             }
         }
     }
@@ -716,13 +742,12 @@ public class JdbcCommon {
      * provided PreparedStatement
      *
      * @param stmt the PreparedStatement to set the parameter on
-     * @param attrName the name of the attribute that the parameter is coming 
from - for logging purposes
      * @param parameterIndex the index of the SQL parameter to set
      * @param parameterValue the value of the SQL parameter to set
      * @param jdbcType the JDBC Type of the SQL parameter to set
      * @throws SQLException if the PreparedStatement throws a SQLException 
when calling the appropriate setter
      */
-    public static void setParameter(final PreparedStatement stmt, final String 
attrName, final int parameterIndex, final String parameterValue, final int 
jdbcType,
+    public static void setParameter(final PreparedStatement stmt, final int 
parameterIndex, final String parameterValue, final int jdbcType,
                               final String valueFormat)
             throws SQLException, ParseException, UnsupportedEncodingException {
         if (parameterValue == null) {
@@ -902,5 +927,4 @@ public class JdbcCommon {
         void processRow(ResultSet resultSet) throws IOException;
         void applyStateChanges();
     }
-
 }
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/SensitiveValueWrapper.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/SensitiveValueWrapper.java
new file mode 100644
index 0000000000..686e43f9f9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/SensitiveValueWrapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.db;
+
+public class SensitiveValueWrapper {
+
+    private final String value;
+    private final boolean sensitive;
+
+    public SensitiveValueWrapper(final String value, final boolean sensitive) {
+        this.value = value;
+        this.sensitive = sensitive;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    public boolean isSensitive() {
+        return sensitive;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
index aa858316a6..20e6cb87df 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
@@ -67,14 +67,18 @@ import java.sql.Types;
 import java.time.Instant;
 import java.time.LocalTime;
 import java.time.ZoneId;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.stream.IntStream;
 
+import static org.apache.nifi.util.db.JdbcCommon.MASKED_LOG_VALUE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -196,7 +200,7 @@ public class TestJdbcCommon {
     }
 
     @Test
-    public void testCreateSchemaOnlyColumnLabel() throws 
ClassNotFoundException, SQLException {
+    public void testCreateSchemaOnlyColumnLabel() throws SQLException {
 
         final ResultSet resultSet = mock(ResultSet.class);
         final ResultSetMetaData resultSetMetaData = 
mock(ResultSetMetaData.class);
@@ -845,4 +849,32 @@ public class TestJdbcCommon {
         assertNotNull(clazz);
     }
 
+    @Test
+    public void testSetSensitiveParametersDoesNotLogSensitiveValues() throws 
SQLException {
+        final Map<String, SensitiveValueWrapper> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", new SensitiveValueWrapper("4", 
false));
+        attributes.put("sql.args.1.value", new SensitiveValueWrapper("123.4", 
true));
+        try (final Statement stmt = con.createStatement()) {
+            stmt.executeUpdate("CREATE TABLE inttest (id INT)");
+            PreparedStatement ps = con.prepareStatement("INSERT INTO inttest 
VALUES (?)");
+            final SQLException exception = assertThrows(SQLException.class, () 
-> JdbcCommon.setSensitiveParameters(ps, attributes));
+            assertTrue(exception.getMessage().contains(MASKED_LOG_VALUE));
+            assertFalse(exception.getMessage().contains("123.4"));
+        }
+    }
+
+    @Test
+    public void testSetParametersDoesNotHaveSensitiveValues() throws 
SQLException {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", "4");
+        attributes.put("sql.args.1.value","123.4");
+        try (final Statement stmt = con.createStatement()) {
+            stmt.executeUpdate("CREATE TABLE inttest (id INT)");
+            PreparedStatement ps = con.prepareStatement("INSERT INTO inttest 
VALUES (?)");
+            final SQLException exception = assertThrows(SQLException.class, () 
-> JdbcCommon.setParameters(ps, attributes));
+            assertFalse(exception.getMessage().contains(MASKED_LOG_VALUE));
+            assertTrue(exception.getMessage().contains("123.4"));
+        }
+    }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 7d5be10a20..6ecc22e9a6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -520,8 +520,8 @@
                         <exclude>src/test/resources/hello.txt</exclude>
                         
<exclude>src/test/resources/CharacterSetConversionSamples/Converted.txt</exclude>
                         
<exclude>src/test/resources/CharacterSetConversionSamples/Original.txt</exclude>
-                        
<exclude>src/test/resources/CompressedData/SampleFile.txt</exclude>
-                        
<exclude>src/test/resources/CompressedData/SampleFileConcat.txt</exclude>
+                        
<exclude>src/test/resources/CompressedData/SampleFile.txt*</exclude>
+                        
<exclude>src/test/resources/CompressedData/SampleFileConcat.txt*</exclude>
                         
<exclude>src/test/resources/ExecuteCommand/1000bytes.txt</exclude>
                         
<exclude>src/test/resources/ExecuteCommand/1mb.txt</exclude>
                         
<exclude>src/test/resources/ExecuteCommand/test.txt</exclude>
@@ -637,11 +637,6 @@
                         
<exclude>src/test/resources/TestXml/xml-bundle-1</exclude>
                         
<exclude>src/test/resources/TestXml/namespaceSplit1.xml</exclude>
                         
<exclude>src/test/resources/TestXml/namespaceSplit2.xml</exclude>
-                        
<exclude>src/test/resources/CompressedData/SampleFile.txt.bz2</exclude>
-                        
<exclude>src/test/resources/CompressedData/SampleFile.txt.gz</exclude>
-                        
<exclude>src/test/resources/CompressedData/SampleFile1.txt.bz2</exclude>
-                        
<exclude>src/test/resources/CompressedData/SampleFile1.txt.gz</exclude>
-                        
<exclude>src/test/resources/CompressedData/SampleFileConcat.txt.bz2</exclude>
                         
<exclude>src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar</exclude>
                         
<exclude>src/test/resources/ExecuteCommand/TestSuccess.jar</exclude>
                         
<exclude>src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar</exclude>
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index 55a4326908..cb6bc761f8 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -35,6 +35,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.db.JdbcCommon;
+import org.apache.nifi.util.db.SensitiveValueWrapper;
 
 import java.nio.charset.Charset;
 import java.sql.Connection;
@@ -52,6 +53,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 
 public abstract class AbstractExecuteSQL extends AbstractProcessor {
@@ -273,10 +275,23 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
                     throw failure.getRight();
                 }
 
+                final Map<String, SensitiveValueWrapper> sqlParameters = 
context.getProperties()
+                        .entrySet()
+                        .stream()
+                        .filter(e -> e.getKey().isDynamic())
+                        .collect(Collectors.toMap(e -> e.getKey().getName(), e 
-> new SensitiveValueWrapper(e.getValue(), e.getKey().isSensitive())));
+
                 if (fileToProcess != null) {
-                    JdbcCommon.setParameters(st, 
fileToProcess.getAttributes());
+                    for (Map.Entry<String, String> entry : 
fileToProcess.getAttributes().entrySet()) {
+                        sqlParameters.put(entry.getKey(), new 
SensitiveValueWrapper(entry.getValue(), false));
+                    }
                 }
-                logger.debug("Executing query {}", new Object[]{selectQuery});
+
+                if (!sqlParameters.isEmpty()) {
+                    JdbcCommon.setSensitiveParameters(st, sqlParameters);
+                }
+
+                logger.debug("Executing query {}", selectQuery);
 
                 int fragmentIndex = 0;
                 final String fragmentId = UUID.randomUUID().toString();
@@ -350,7 +365,7 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
                                         resultSetFF = 
session.putAttribute(resultSetFF, FRAGMENT_INDEX, 
String.valueOf(fragmentIndex));
                                     }
 
-                                    logger.info("{} contains {} records; 
transferring to 'success'", new Object[]{resultSetFF, nrOfRows.get()});
+                                    logger.info("{} contains {} records; 
transferring to 'success'", resultSetFF, nrOfRows.get());
 
                                     // Report a FETCH event if there was an 
incoming flow file, or a RECEIVE event otherwise
                                     if (context.hasIncomingConnection()) {
@@ -414,7 +429,7 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
                 failure = executeConfigStatements(con, postQueries);
                 if (failure != null) {
                     selectQuery = failure.getLeft();
-                    resultSetFlowFiles.forEach(ff -> session.remove(ff));
+                    resultSetFlowFiles.forEach(session::remove);
                     throw failure.getRight();
                 }
 
@@ -454,17 +469,14 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
             //  pass the original flow file down the line to trigger 
downstream processors
             if (fileToProcess == null) {
                 // This can happen if any exceptions occur while setting up 
the connection, statement, etc.
-                logger.error("Unable to execute SQL select query {} due to {}. 
No FlowFile to route to failure",
-                        new Object[]{selectQuery, e});
+                logger.error("Unable to execute SQL select query [{}]. No 
FlowFile to route to failure", selectQuery, e);
                 context.yield();
             } else {
                 if (context.hasIncomingConnection()) {
-                    logger.error("Unable to execute SQL select query {} for {} 
due to {}; routing to failure",
-                            new Object[]{selectQuery, fileToProcess, e});
+                    logger.error("Unable to execute SQL select query [{}] for 
{} routing to failure", selectQuery, fileToProcess, e);
                     fileToProcess = session.penalize(fileToProcess);
                 } else {
-                    logger.error("Unable to execute SQL select query {} due to 
{}; routing to failure",
-                            new Object[]{selectQuery, e});
+                    logger.error("Unable to execute SQL select query [{}] 
routing to failure", selectQuery, e);
                     context.yield();
                 }
                 
session.putAttribute(fileToProcess,RESULT_ERROR_MESSAGE,e.getMessage());
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index cc819db405..850657b97e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -16,36 +16,41 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
+import org.apache.nifi.annotation.behavior.DynamicProperties;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
 import org.apache.nifi.util.db.JdbcCommon;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.nifi.util.db.AvroUtil.CodecType;
 import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_PRECISION;
 import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_SCALE;
 import static org.apache.nifi.util.db.JdbcProperties.NORMALIZE_NAMES_FOR_AVRO;
 import static org.apache.nifi.util.db.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
-import static org.apache.nifi.util.db.AvroUtil.CodecType;
 
 @EventDriven
 @InputRequirement(Requirement.INPUT_ALLOWED)
@@ -60,7 +65,9 @@ import static org.apache.nifi.util.db.AvroUtil.CodecType;
         + "FlowFile attribute 'executesql.row.count' indicates how many rows 
were selected.")
 @ReadsAttributes({
         @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming 
FlowFiles are expected to be parametrized SQL statements. The type of each 
Parameter is specified as an integer "
-                + "that represents the JDBC Type of the parameter."),
+                + "that represents the JDBC Type of the parameter. The 
following types are accepted: [LONGNVARCHAR: -16], [BIT: -7], [BOOLEAN: 16], 
[TINYINT: -6], [BIGINT: -5], "
+                + "[LONGVARBINARY: -4], [VARBINARY: -3], [BINARY: -2], 
[LONGVARCHAR: -1], [CHAR: 1], [NUMERIC: 2], [DECIMAL: 3], [INTEGER: 4], 
[SMALLINT: 5] "
+                + "[FLOAT: 6], [REAL: 7], [DOUBLE: 8], [VARCHAR: 12], [DATE: 
91], [TIME: 92], [TIMESTAMP: 93], [VARCHAR: 12], [CLOB: 2005], [NCLOB: 2011]"),
         @ReadsAttribute(attribute = "sql.args.N.value", description = 
"Incoming FlowFiles are expected to be parametrized SQL statements. The value 
of the Parameters are specified as "
                 + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and 
so on. The type of the sql.args.1.value Parameter is specified by the 
sql.args.1.type attribute."),
         @ReadsAttribute(attribute = "sql.args.N.format", description = "This 
attribute is always optional, but default options may not always work for your 
data. "
@@ -102,6 +109,33 @@ import static org.apache.nifi.util.db.AvroUtil.CodecType;
         @WritesAttribute(attribute = "input.flowfile.uuid", description = "If 
the processor has an incoming connection, outgoing FlowFiles will have this 
attribute "
                 + "set to the value of the input FlowFile's UUID. If there is 
no incoming connection, the attribute will not be added.")
 })
+@SupportsSensitiveDynamicProperties
+@DynamicProperties({
+        @DynamicProperty(name = "sql.args.N.type",
+                value = "SQL type argument to be supplied",
+                description = "Incoming FlowFiles are expected to be 
parametrized SQL statements. The type of each Parameter is specified as an 
integer "
+                        + "that represents the JDBC Type of the parameter. The 
following types are accepted: [LONGNVARCHAR: -16], [BIT: -7], [BOOLEAN: 16], 
[TINYINT: -6], [BIGINT: -5], "
+                        + "[LONGVARBINARY: -4], [VARBINARY: -3], [BINARY: -2], 
[LONGVARCHAR: -1], [CHAR: 1], [NUMERIC: 2], [DECIMAL: 3], [INTEGER: 4], 
[SMALLINT: 5] "
+                        + "[FLOAT: 6], [REAL: 7], [DOUBLE: 8], [VARCHAR: 12], 
[DATE: 91], [TIME: 92], [TIMESTAMP: 93], [VARCHAR: 12], [CLOB: 2005], [NCLOB: 
2011]"),
+        @DynamicProperty(name = "sql.args.N.value",
+                value = "Argument to be supplied",
+                description = "Incoming FlowFiles are expected to be 
parametrized SQL statements. The value of the Parameters are specified as "
+                        + "sql.args.1.value, sql.args.2.value, 
sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is 
specified by the sql.args.1.type attribute."),
+        @DynamicProperty(name = "sql.args.N.format",
+                value = "SQL format argument to be supplied",
+                description = "This attribute is always optional, but default 
options may not always work for your data. "
+                        + "Incoming FlowFiles are expected to be parametrized 
SQL statements. In some cases "
+                        + "a format option needs to be specified, currently 
this is only applicable for binary data types, dates, times and timestamps. 
Binary Data Types (defaults to 'ascii') - "
+                        + "ascii: each string character in your attribute 
value represents a single byte. This is the format provided by Avro Processors. 
"
+                        + "base64: the string is a Base64 encoded string that 
can be decoded to bytes. "
+                        + "hex: the string is hex encoded with all letters in 
upper case and no '0x' at the beginning. "
+                        + "Dates/Times/Timestamps - "
+                        + "Date, Time and Timestamp formats all support both 
custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') "
+                        + "as specified according to 
java.time.format.DateTimeFormatter. "
+                        + "If not specified, a long value input is expected to 
be an unix epoch (milli seconds from 1970/1/1), or a string value in "
+                        + "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for 
Time (some database engines e.g. Derby or MySQL do not support milliseconds and 
will truncate milliseconds), "
+                        + "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")
+})
 public class ExecuteSQL extends AbstractExecuteSQL {
 
     public static final PropertyDescriptor COMPRESSION_FORMAT = new 
PropertyDescriptor.Builder()
@@ -157,4 +191,15 @@ public class ExecuteSQL extends AbstractExecuteSQL {
                 .build();
         return new DefaultAvroSqlWriter(options);
     }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .dynamic(true)
+                
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
+                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .build();
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
index 2ebddacef0..cd36e42fef 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
@@ -16,20 +16,25 @@
  */
 package org.apache.nifi.processors.standard;
 
+import org.apache.nifi.annotation.behavior.DynamicProperties;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -59,7 +64,9 @@ import static 
org.apache.nifi.util.db.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
         + "FlowFile attribute 'executesql.row.count' indicates how many rows 
were selected.")
 @ReadsAttributes({
         @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming 
FlowFiles are expected to be parametrized SQL statements. The type of each 
Parameter is specified as an integer "
-                + "that represents the JDBC Type of the parameter."),
+                + "that represents the JDBC Type of the parameter. The 
following types are accepted: [LONGNVARCHAR: -16], [BIT: -7], [BOOLEAN: 16], 
[TINYINT: -6], [BIGINT: -5], "
+                + "[LONGVARBINARY: -4], [VARBINARY: -3], [BINARY: -2], 
[LONGVARCHAR: -1], [CHAR: 1], [NUMERIC: 2], [DECIMAL: 3], [INTEGER: 4], 
[SMALLINT: 5] "
+                + "[FLOAT: 6], [REAL: 7], [DOUBLE: 8], [VARCHAR: 12], [DATE: 
91], [TIME: 92], [TIMESTAMP: 93], [VARCHAR: 12], [CLOB: 2005], [NCLOB: 2011]"),
         @ReadsAttribute(attribute = "sql.args.N.value", description = 
"Incoming FlowFiles are expected to be parametrized SQL statements. The value 
of the Parameters are specified as "
                 + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and 
so on. The type of the sql.args.1.value Parameter is specified by the 
sql.args.1.type attribute."),
         @ReadsAttribute(attribute = "sql.args.N.format", description = "This 
attribute is always optional, but default options may not always work for your 
data. "
@@ -99,6 +106,33 @@ import static 
org.apache.nifi.util.db.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
         @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer."),
         @WritesAttribute(attribute = "record.count", description = "The number 
of records output by the Record Writer.")
 })
+@SupportsSensitiveDynamicProperties
+@DynamicProperties({
+        @DynamicProperty(name = "sql.args.N.type",
+                value = "SQL type argument to be supplied",
+                description = "Incoming FlowFiles are expected to be 
parametrized SQL statements. The type of each Parameter is specified as an 
integer "
+                        + "that represents the JDBC Type of the parameter. The 
following types are accepted: [LONGNVARCHAR: -16], [BIT: -7], [BOOLEAN: 16], 
[TINYINT: -6], [BIGINT: -5], "
+                        + "[LONGVARBINARY: -4], [VARBINARY: -3], [BINARY: -2], 
[LONGVARCHAR: -1], [CHAR: 1], [NUMERIC: 2], [DECIMAL: 3], [INTEGER: 4], 
[SMALLINT: 5] "
+                        + "[FLOAT: 6], [REAL: 7], [DOUBLE: 8], [VARCHAR: 12], 
[DATE: 91], [TIME: 92], [TIMESTAMP: 93], [VARCHAR: 12], [CLOB: 2005], [NCLOB: 
2011]"),
+        @DynamicProperty(name = "sql.args.N.value",
+                value = "Argument to be supplied",
+                description = "Incoming FlowFiles are expected to be 
parametrized SQL statements. The value of the Parameters are specified as "
+                        + "sql.args.1.value, sql.args.2.value, 
sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is 
specified by the sql.args.1.type attribute."),
+        @DynamicProperty(name = "sql.args.N.format",
+                value = "SQL format argument to be supplied",
+                description = "This attribute is always optional, but default 
options may not always work for your data. "
+                        + "Incoming FlowFiles are expected to be parametrized 
SQL statements. In some cases "
+                        + "a format option needs to be specified, currently 
this is only applicable for binary data types, dates, times and timestamps. 
Binary Data Types (defaults to 'ascii') - "
+                        + "ascii: each string character in your attribute 
value represents a single byte. This is the format provided by Avro Processors. 
"
+                        + "base64: the string is a Base64 encoded string that 
can be decoded to bytes. "
+                        + "hex: the string is hex encoded with all letters in 
upper case and no '0x' at the beginning. "
+                        + "Dates/Times/Timestamps - "
+                        + "Date, Time and Timestamp formats all support both 
custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') "
+                        + "as specified according to 
java.time.format.DateTimeFormatter. "
+                        + "If not specified, a long value input is expected to 
be an unix epoch (milli seconds from 1970/1/1), or a string value in "
+                        + "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for 
Time (some database engines e.g. Derby or MySQL do not support milliseconds and 
will truncate milliseconds), "
+                        + "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")
+})
 public class ExecuteSQLRecord extends AbstractExecuteSQL {
 
 
@@ -161,4 +195,15 @@ public class ExecuteSQLRecord extends AbstractExecuteSQL {
 
         return new RecordSqlWriter(recordSetWriterFactory, options, 
maxRowsPerFlowFile, fileToProcess == null ? Collections.emptyMap() : 
fileToProcess.getAttributes());
     }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .dynamic(true)
+                
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
+                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .build();
+    }
 }


Reply via email to