This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 7475232bb09c15bad67bf1fe863d9afd7f503569 Author: Lehel <lehe...@hotmail.com> AuthorDate: Tue Jan 17 16:15:15 2023 +0100 NIFI-11045: Sensitive dynamic property support for parameterized queries in ExecuteSQL and ExecuteSQLRecord This closes #6853. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../java/org/apache/nifi/util/db/JdbcCommon.java | 78 ++++++++++++++-------- .../apache/nifi/util/db/SensitiveValueWrapper.java | 36 ++++++++++ .../org/apache/nifi/util/db/TestJdbcCommon.java | 34 +++++++++- .../nifi-standard-processors/pom.xml | 9 +-- .../processors/standard/AbstractExecuteSQL.java | 32 ++++++--- .../nifi/processors/standard/ExecuteSQL.java | 61 ++++++++++++++--- .../nifi/processors/standard/ExecuteSQLRecord.java | 47 ++++++++++++- 7 files changed, 243 insertions(+), 54 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java index ad7471a54c..dbd18e931d 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java @@ -68,9 +68,11 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static java.sql.Types.ARRAY; import static java.sql.Types.BIGINT; @@ -120,6 +122,7 @@ public class JdbcCommon { public static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary"; + public static final String MASKED_LOG_VALUE = "MASKED VALUE"; public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException { return convertToAvroStream(rs, outStream, null, null, convertNames); @@ -681,32 +684,55 @@ public class JdbcCommon { * @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called */ public static void setParameters(final PreparedStatement stmt, final Map<String, String> attributes) throws SQLException { - for (final Map.Entry<String, String> entry : attributes.entrySet()) { - final String key = entry.getKey(); - final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key); - if (matcher.matches()) { - final int parameterIndex = Integer.parseInt(matcher.group(1)); - - final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches(); - if (!isNumeric) { - throw new SQLDataException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type"); - } + final Map<String, SensitiveValueWrapper> sensitiveValueWrapperMap = attributes.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new SensitiveValueWrapper(e.getValue(), false))); + setSensitiveParameters(stmt, sensitiveValueWrapperMap); + } + + /** + * Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes + * and masks sensitive values. + * + * @param stmt the statement to set the parameters on + * @param attributes the attributes from which to derive parameter indices, values, and types + * @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called + */ + public static void setSensitiveParameters(final PreparedStatement stmt, final Map<String, SensitiveValueWrapper> attributes) throws SQLException { + for (final Map.Entry<String, SensitiveValueWrapper> entry : attributes.entrySet()) { + final String flowFileAttributeKey = entry.getKey(); + setParameterAtIndex(stmt, attributes, flowFileAttributeKey); + } + } - final int jdbcType = Integer.parseInt(entry.getValue()); - final String valueAttrName = "sql.args." + parameterIndex + ".value"; - final String parameterValue = attributes.get(valueAttrName); - final String formatAttrName = "sql.args." + parameterIndex + ".format"; - final String parameterFormat = attributes.containsKey(formatAttrName)? attributes.get(formatAttrName):""; + private static void setParameterAtIndex(final PreparedStatement stmt, final Map<String, SensitiveValueWrapper> attributes, final String flowFileAttributeKey) throws SQLException { + final Matcher sqlArgumentTypeMatcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(flowFileAttributeKey); + if (sqlArgumentTypeMatcher.matches()) { + final int sqlArgumentIndex = Integer.parseInt(sqlArgumentTypeMatcher.group(1)); - try { - JdbcCommon.setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType, parameterFormat); - } catch (final NumberFormatException nfe) { - throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe); - } catch (ParseException pe) { - throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to a timestamp", pe); - } catch (UnsupportedEncodingException uee) { - throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to UTF-8", uee); - } + final String sqlArgumentTypeAttributeName = flowFileAttributeKey; + final String sqlArgumentType = attributes.get(sqlArgumentTypeAttributeName).getValue(); + final boolean isNumeric = NUMBER_PATTERN.matcher(sqlArgumentType).matches(); + if (!isNumeric) { + throw new SQLDataException("Value of the " + sqlArgumentTypeAttributeName + " attribute is '" + sqlArgumentType + "', which is not a valid numeral SQL data type"); + } + + final int sqlType = Integer.parseInt(sqlArgumentType); + final String sqlArgumentValueAttributeName = "sql.args." + sqlArgumentIndex + ".value"; + final Optional<SensitiveValueWrapper> sqlArgumentValueWrapper = Optional.ofNullable(attributes.get(sqlArgumentValueAttributeName)); + final String sqlArgumentValue = sqlArgumentValueWrapper.map(SensitiveValueWrapper::getValue).orElse(null); + final String sqlArgumentLogValue = sqlArgumentValueWrapper.map(a -> a.isSensitive() ? MASKED_LOG_VALUE : a.getValue()).orElse(null); + final String sqlArgumentFormatAttributeName = "sql.args." + sqlArgumentIndex + ".format"; + final String sqlArgumentFormat = attributes.containsKey(sqlArgumentFormatAttributeName) ? attributes.get(sqlArgumentFormatAttributeName).getValue() : ""; + + try { + JdbcCommon.setParameter(stmt, sqlArgumentIndex, sqlArgumentValue, sqlType, sqlArgumentFormat); + } catch (final NumberFormatException nfe) { + throw new SQLDataException("The value of the " + sqlArgumentValueAttributeName + " is '" + sqlArgumentLogValue + "', which cannot be converted into the necessary data type", nfe); + } catch (ParseException pe) { + throw new SQLDataException("The value of the " + sqlArgumentValueAttributeName + " is '" + sqlArgumentLogValue + "', which cannot be converted to a timestamp", pe); + } catch (UnsupportedEncodingException uee) { + throw new SQLDataException("The value of the " + sqlArgumentValueAttributeName + " is '" + sqlArgumentLogValue + "', which cannot be converted to UTF-8", uee); } } } @@ -716,13 +742,12 @@ public class JdbcCommon { * provided PreparedStatement * * @param stmt the PreparedStatement to set the parameter on - * @param attrName the name of the attribute that the parameter is coming from - for logging purposes * @param parameterIndex the index of the SQL parameter to set * @param parameterValue the value of the SQL parameter to set * @param jdbcType the JDBC Type of the SQL parameter to set * @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter */ - public static void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType, + public static void setParameter(final PreparedStatement stmt, final int parameterIndex, final String parameterValue, final int jdbcType, final String valueFormat) throws SQLException, ParseException, UnsupportedEncodingException { if (parameterValue == null) { @@ -902,5 +927,4 @@ public class JdbcCommon { void processRow(ResultSet resultSet) throws IOException; void applyStateChanges(); } - } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/SensitiveValueWrapper.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/SensitiveValueWrapper.java new file mode 100644 index 0000000000..686e43f9f9 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/SensitiveValueWrapper.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.db; + +public class SensitiveValueWrapper { + + private final String value; + private final boolean sensitive; + + public SensitiveValueWrapper(final String value, final boolean sensitive) { + this.value = value; + this.sensitive = sensitive; + } + + public String getValue() { + return value; + } + + public boolean isSensitive() { + return sensitive; + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java index aa858316a6..20e6cb87df 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java @@ -67,14 +67,18 @@ import java.sql.Types; import java.time.Instant; import java.time.LocalTime; import java.time.ZoneId; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.stream.IntStream; +import static org.apache.nifi.util.db.JdbcCommon.MASKED_LOG_VALUE; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -196,7 +200,7 @@ public class TestJdbcCommon { } @Test - public void testCreateSchemaOnlyColumnLabel() throws ClassNotFoundException, SQLException { + public void testCreateSchemaOnlyColumnLabel() throws SQLException { final ResultSet resultSet = mock(ResultSet.class); final ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class); @@ -845,4 +849,32 @@ public class TestJdbcCommon { assertNotNull(clazz); } + @Test + public void testSetSensitiveParametersDoesNotLogSensitiveValues() throws SQLException { + final Map<String, SensitiveValueWrapper> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", new SensitiveValueWrapper("4", false)); + attributes.put("sql.args.1.value", new SensitiveValueWrapper("123.4", true)); + try (final Statement stmt = con.createStatement()) { + stmt.executeUpdate("CREATE TABLE inttest (id INT)"); + PreparedStatement ps = con.prepareStatement("INSERT INTO inttest VALUES (?)"); + final SQLException exception = assertThrows(SQLException.class, () -> JdbcCommon.setSensitiveParameters(ps, attributes)); + assertTrue(exception.getMessage().contains(MASKED_LOG_VALUE)); + assertFalse(exception.getMessage().contains("123.4")); + } + } + + @Test + public void testSetParametersDoesNotHaveSensitiveValues() throws SQLException { + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", "4"); + attributes.put("sql.args.1.value","123.4"); + try (final Statement stmt = con.createStatement()) { + stmt.executeUpdate("CREATE TABLE inttest (id INT)"); + PreparedStatement ps = con.prepareStatement("INSERT INTO inttest VALUES (?)"); + final SQLException exception = assertThrows(SQLException.class, () -> JdbcCommon.setParameters(ps, attributes)); + assertFalse(exception.getMessage().contains(MASKED_LOG_VALUE)); + assertTrue(exception.getMessage().contains("123.4")); + } + } + } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 7d5be10a20..6ecc22e9a6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -520,8 +520,8 @@ <exclude>src/test/resources/hello.txt</exclude> <exclude>src/test/resources/CharacterSetConversionSamples/Converted.txt</exclude> <exclude>src/test/resources/CharacterSetConversionSamples/Original.txt</exclude> - <exclude>src/test/resources/CompressedData/SampleFile.txt</exclude> - <exclude>src/test/resources/CompressedData/SampleFileConcat.txt</exclude> + <exclude>src/test/resources/CompressedData/SampleFile.txt*</exclude> + <exclude>src/test/resources/CompressedData/SampleFileConcat.txt*</exclude> <exclude>src/test/resources/ExecuteCommand/1000bytes.txt</exclude> <exclude>src/test/resources/ExecuteCommand/1mb.txt</exclude> <exclude>src/test/resources/ExecuteCommand/test.txt</exclude> @@ -637,11 +637,6 @@ <exclude>src/test/resources/TestXml/xml-bundle-1</exclude> <exclude>src/test/resources/TestXml/namespaceSplit1.xml</exclude> <exclude>src/test/resources/TestXml/namespaceSplit2.xml</exclude> - <exclude>src/test/resources/CompressedData/SampleFile.txt.bz2</exclude> - <exclude>src/test/resources/CompressedData/SampleFile.txt.gz</exclude> - <exclude>src/test/resources/CompressedData/SampleFile1.txt.bz2</exclude> - <exclude>src/test/resources/CompressedData/SampleFile1.txt.gz</exclude> - <exclude>src/test/resources/CompressedData/SampleFileConcat.txt.bz2</exclude> <exclude>src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar</exclude> <exclude>src/test/resources/ExecuteCommand/TestSuccess.jar</exclude> <exclude>src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar</exclude> diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java index 55a4326908..cb6bc761f8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java @@ -35,6 +35,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.sql.SqlWriter; import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.db.JdbcCommon; +import org.apache.nifi.util.db.SensitiveValueWrapper; import java.nio.charset.Charset; import java.sql.Connection; @@ -52,6 +53,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; public abstract class AbstractExecuteSQL extends AbstractProcessor { @@ -273,10 +275,23 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { throw failure.getRight(); } + final Map<String, SensitiveValueWrapper> sqlParameters = context.getProperties() + .entrySet() + .stream() + .filter(e -> e.getKey().isDynamic()) + .collect(Collectors.toMap(e -> e.getKey().getName(), e -> new SensitiveValueWrapper(e.getValue(), e.getKey().isSensitive()))); + if (fileToProcess != null) { - JdbcCommon.setParameters(st, fileToProcess.getAttributes()); + for (Map.Entry<String, String> entry : fileToProcess.getAttributes().entrySet()) { + sqlParameters.put(entry.getKey(), new SensitiveValueWrapper(entry.getValue(), false)); + } } - logger.debug("Executing query {}", new Object[]{selectQuery}); + + if (!sqlParameters.isEmpty()) { + JdbcCommon.setSensitiveParameters(st, sqlParameters); + } + + logger.debug("Executing query {}", selectQuery); int fragmentIndex = 0; final String fragmentId = UUID.randomUUID().toString(); @@ -350,7 +365,7 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex)); } - logger.info("{} contains {} records; transferring to 'success'", new Object[]{resultSetFF, nrOfRows.get()}); + logger.info("{} contains {} records; transferring to 'success'", resultSetFF, nrOfRows.get()); // Report a FETCH event if there was an incoming flow file, or a RECEIVE event otherwise if (context.hasIncomingConnection()) { @@ -414,7 +429,7 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { failure = executeConfigStatements(con, postQueries); if (failure != null) { selectQuery = failure.getLeft(); - resultSetFlowFiles.forEach(ff -> session.remove(ff)); + resultSetFlowFiles.forEach(session::remove); throw failure.getRight(); } @@ -454,17 +469,14 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { // pass the original flow file down the line to trigger downstream processors if (fileToProcess == null) { // This can happen if any exceptions occur while setting up the connection, statement, etc. - logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure", - new Object[]{selectQuery, e}); + logger.error("Unable to execute SQL select query [{}]. No FlowFile to route to failure", selectQuery, e); context.yield(); } else { if (context.hasIncomingConnection()) { - logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", - new Object[]{selectQuery, fileToProcess, e}); + logger.error("Unable to execute SQL select query [{}] for {} routing to failure", selectQuery, fileToProcess, e); fileToProcess = session.penalize(fileToProcess); } else { - logger.error("Unable to execute SQL select query {} due to {}; routing to failure", - new Object[]{selectQuery, e}); + logger.error("Unable to execute SQL select query [{}] routing to failure", selectQuery, e); context.yield(); } session.putAttribute(fileToProcess,RESULT_ERROR_MESSAGE,e.getMessage()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index cc819db405..850657b97e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -16,36 +16,41 @@ */ package org.apache.nifi.processors.standard; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - +import org.apache.nifi.annotation.behavior.DynamicProperties; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter; import org.apache.nifi.processors.standard.sql.SqlWriter; import org.apache.nifi.util.db.JdbcCommon; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.nifi.util.db.AvroUtil.CodecType; import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_PRECISION; import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_SCALE; import static org.apache.nifi.util.db.JdbcProperties.NORMALIZE_NAMES_FOR_AVRO; import static org.apache.nifi.util.db.JdbcProperties.USE_AVRO_LOGICAL_TYPES; -import static org.apache.nifi.util.db.AvroUtil.CodecType; @EventDriven @InputRequirement(Requirement.INPUT_ALLOWED) @@ -60,7 +65,9 @@ import static org.apache.nifi.util.db.AvroUtil.CodecType; + "FlowFile attribute 'executesql.row.count' indicates how many rows were selected.") @ReadsAttributes({ @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The type of each Parameter is specified as an integer " - + "that represents the JDBC Type of the parameter."), + + "that represents the JDBC Type of the parameter. The following types are accepted: [LONGNVARCHAR: -16], [BIT: -7], [BOOLEAN: 16], [TINYINT: -6], [BIGINT: -5], " + + "[LONGVARBINARY: -4], [VARBINARY: -3], [BINARY: -2], [LONGVARCHAR: -1], [CHAR: 1], [NUMERIC: 2], [DECIMAL: 3], [INTEGER: 4], [SMALLINT: 5] " + + "[FLOAT: 6], [REAL: 7], [DOUBLE: 8], [VARCHAR: 12], [DATE: 91], [TIME: 92], [TIMESTAMP: 93], [VARCHAR: 12], [CLOB: 2005], [NCLOB: 2011]"), @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The value of the Parameters are specified as " + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."), @ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. " @@ -102,6 +109,33 @@ import static org.apache.nifi.util.db.AvroUtil.CodecType; @WritesAttribute(attribute = "input.flowfile.uuid", description = "If the processor has an incoming connection, outgoing FlowFiles will have this attribute " + "set to the value of the input FlowFile's UUID. If there is no incoming connection, the attribute will not be added.") }) +@SupportsSensitiveDynamicProperties +@DynamicProperties({ + @DynamicProperty(name = "sql.args.N.type", + value = "SQL type argument to be supplied", + description = "Incoming FlowFiles are expected to be parametrized SQL statements. The type of each Parameter is specified as an integer " + + "that represents the JDBC Type of the parameter. The following types are accepted: [LONGNVARCHAR: -16], [BIT: -7], [BOOLEAN: 16], [TINYINT: -6], [BIGINT: -5], " + + "[LONGVARBINARY: -4], [VARBINARY: -3], [BINARY: -2], [LONGVARCHAR: -1], [CHAR: 1], [NUMERIC: 2], [DECIMAL: 3], [INTEGER: 4], [SMALLINT: 5] " + + "[FLOAT: 6], [REAL: 7], [DOUBLE: 8], [VARCHAR: 12], [DATE: 91], [TIME: 92], [TIMESTAMP: 93], [VARCHAR: 12], [CLOB: 2005], [NCLOB: 2011]"), + @DynamicProperty(name = "sql.args.N.value", + value = "Argument to be supplied", + description = "Incoming FlowFiles are expected to be parametrized SQL statements. The value of the Parameters are specified as " + + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."), + @DynamicProperty(name = "sql.args.N.format", + value = "SQL format argument to be supplied", + description = "This attribute is always optional, but default options may not always work for your data. " + + "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases " + + "a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - " + + "ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. " + + "base64: the string is a Base64 encoded string that can be decoded to bytes. " + + "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. " + + "Dates/Times/Timestamps - " + + "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') " + + "as specified according to java.time.format.DateTimeFormatter. " + + "If not specified, a long value input is expected to be an unix epoch (milli seconds from 1970/1/1), or a string value in " + + "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (some database engines e.g. Derby or MySQL do not support milliseconds and will truncate milliseconds), " + + "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.") +}) public class ExecuteSQL extends AbstractExecuteSQL { public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder() @@ -157,4 +191,15 @@ public class ExecuteSQL extends AbstractExecuteSQL { .build(); return new DefaultAvroSqlWriter(options); } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .dynamic(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .build(); + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java index 2ebddacef0..cd36e42fef 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java @@ -16,20 +16,25 @@ */ package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.DynamicProperties; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.sql.RecordSqlWriter; import org.apache.nifi.processors.standard.sql.SqlWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; @@ -59,7 +64,9 @@ import static org.apache.nifi.util.db.JdbcProperties.USE_AVRO_LOGICAL_TYPES; + "FlowFile attribute 'executesql.row.count' indicates how many rows were selected.") @ReadsAttributes({ @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The type of each Parameter is specified as an integer " - + "that represents the JDBC Type of the parameter."), + + "that represents the JDBC Type of the parameter. The following types are accepted: [LONGNVARCHAR: -16], [BIT: -7], [BOOLEAN: 16], [TINYINT: -6], [BIGINT: -5], " + + "[LONGVARBINARY: -4], [VARBINARY: -3], [BINARY: -2], [LONGVARCHAR: -1], [CHAR: 1], [NUMERIC: 2], [DECIMAL: 3], [INTEGER: 4], [SMALLINT: 5] " + + "[FLOAT: 6], [REAL: 7], [DOUBLE: 8], [VARCHAR: 12], [DATE: 91], [TIME: 92], [TIMESTAMP: 93], [VARCHAR: 12], [CLOB: 2005], [NCLOB: 2011]"), @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The value of the Parameters are specified as " + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."), @ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. " @@ -99,6 +106,33 @@ import static org.apache.nifi.util.db.JdbcProperties.USE_AVRO_LOGICAL_TYPES; @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."), @WritesAttribute(attribute = "record.count", description = "The number of records output by the Record Writer.") }) +@SupportsSensitiveDynamicProperties +@DynamicProperties({ + @DynamicProperty(name = "sql.args.N.type", + value = "SQL type argument to be supplied", + description = "Incoming FlowFiles are expected to be parametrized SQL statements. The type of each Parameter is specified as an integer " + + "that represents the JDBC Type of the parameter. The following types are accepted: [LONGNVARCHAR: -16], [BIT: -7], [BOOLEAN: 16], [TINYINT: -6], [BIGINT: -5], " + + "[LONGVARBINARY: -4], [VARBINARY: -3], [BINARY: -2], [LONGVARCHAR: -1], [CHAR: 1], [NUMERIC: 2], [DECIMAL: 3], [INTEGER: 4], [SMALLINT: 5] " + + "[FLOAT: 6], [REAL: 7], [DOUBLE: 8], [VARCHAR: 12], [DATE: 91], [TIME: 92], [TIMESTAMP: 93], [VARCHAR: 12], [CLOB: 2005], [NCLOB: 2011]"), + @DynamicProperty(name = "sql.args.N.value", + value = "Argument to be supplied", + description = "Incoming FlowFiles are expected to be parametrized SQL statements. The value of the Parameters are specified as " + + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."), + @DynamicProperty(name = "sql.args.N.format", + value = "SQL format argument to be supplied", + description = "This attribute is always optional, but default options may not always work for your data. " + + "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases " + + "a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - " + + "ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. " + + "base64: the string is a Base64 encoded string that can be decoded to bytes. " + + "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. " + + "Dates/Times/Timestamps - " + + "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') " + + "as specified according to java.time.format.DateTimeFormatter. " + + "If not specified, a long value input is expected to be an unix epoch (milli seconds from 1970/1/1), or a string value in " + + "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (some database engines e.g. Derby or MySQL do not support milliseconds and will truncate milliseconds), " + + "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.") +}) public class ExecuteSQLRecord extends AbstractExecuteSQL { @@ -161,4 +195,15 @@ public class ExecuteSQLRecord extends AbstractExecuteSQL { return new RecordSqlWriter(recordSetWriterFactory, options, maxRowsPerFlowFile, fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes()); } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .dynamic(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .build(); + } }