Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1677#discussion_r112190994
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 ---
    @@ -0,0 +1,1067 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.dbcp.DBCPService;
    +import org.apache.nifi.expression.AttributeExpression;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RowRecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.sql.Connection;
    +import java.sql.DatabaseMetaData;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.ResultSetMetaData;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.stream.IntStream;
    +
    +
    +@EventDriven
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"sql", "record", "convert", "jdbc", "put", "database"})
    +@SeeAlso({ConvertJSONToSQL.class, PutSQL.class})
    +@CapabilityDescription("The PutDatabaseRecord processor uses a specified 
RecordReader to input (possibly multiple) records from an incoming flow file. 
These records are translated to SQL "
    +        + "statements and executed as a single batch. If any errors occur, 
the flow file is routed to failure or retry, and if the records are transmitted 
successfully, the incoming flow file is "
    +        + "routed to success.  The type of statement executed by the 
processor is specified via the Statement Type property, which accepts some 
hard-coded values such as INSERT, UPDATE, and DELETE, "
    +        + "as well as 'Use statement.type Attribute', which causes the 
processor to get the statement type from a flow file attribute.")
    +@ReadsAttribute(attribute = PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, 
description = "If 'Use statement.type Attribute' is selected for the Statement 
Type property, the value of this attribute "
    +        + "will be used to determine the type of statement (INSERT, 
UPDATE, DELETE, SQL, etc.) to generate and execute.")
    +public class PutDatabaseRecord extends AbstractProcessor {
    +
    +    static final String UPDATE_TYPE = "UPDATE";
    +    static final String INSERT_TYPE = "INSERT";
    +    static final String DELETE_TYPE = "DELETE";
    +    static final String SQL_TYPE = "SQL";   // Not an allowable value in 
the Statement Type property, must be set by attribute
    +    static final String USE_ATTR_TYPE = "Use statement.type Attribute";
    +
    +    static final String STATEMENT_TYPE_ATTRIBUTE = "statement.type";
    +
    +    static final AllowableValue IGNORE_UNMATCHED_FIELD = new 
AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
    +            "Any field in the document that cannot be mapped to a column 
in the database is ignored");
    +    static final AllowableValue FAIL_UNMATCHED_FIELD = new 
AllowableValue("Fail", "Fail",
    +            "If the document has any field that cannot be mapped to a 
column in the database, the FlowFile will be routed to the failure 
relationship");
    +    static final AllowableValue IGNORE_UNMATCHED_COLUMN = new 
AllowableValue("Ignore Unmatched Columns",
    +            "Ignore Unmatched Columns",
    +            "Any column in the database that does not have a field in the 
document will be assumed to not be required.  No notification will be logged");
    +    static final AllowableValue WARNING_UNMATCHED_COLUMN = new 
AllowableValue("Warn on Unmatched Columns",
    +            "Warn on Unmatched Columns",
    +            "Any column in the database that does not have a field in the 
document will be assumed to not be required.  A warning will be logged");
    +    static final AllowableValue FAIL_UNMATCHED_COLUMN = new 
AllowableValue("Fail on Unmatched Columns",
    +            "Fail on Unmatched Columns",
    +            "A flow will fail if any column in the database that does not 
have a field in the document.  An error will be logged");
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("Successfully created FlowFile from SQL query 
result set.")
    +            .build();
    +
    +    static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("A FlowFile is routed to this relationship if the 
database cannot be updated but attempting the operation again may succeed")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile is routed to this relationship if the 
database cannot be updated and retrying the operation will also fail, "
    +                    + "such as an invalid query or an integrity constraint 
violation")
    +            .build();
    +
    +    protected static Set<Relationship> relationships;
    +
    +    // Properties
    +    static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for 
parsing incoming data and determining the data's schema.")
    +            .identifiesControllerService(RowRecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    static final PropertyDescriptor STATEMENT_TYPE = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-statement-type")
    +            .displayName("Statement Type")
    +            .description("Specifies the type of SQL Statement to generate. 
If 'Use statement.type Attribute' is chosen, then the value is taken from the 
statement.type attribute in the "
    +                    + "FlowFile. The 'Use statement.type Attribute' option 
is the only one that allows the 'SQL' statement type. If 'SQL' is specified, 
the value of the field specified by the "
    +                    + "'Field Containing SQL' property is expected to be a 
valid SQL statement on the target database, and will be executed as-is.")
    +            .required(true)
    +            .allowableValues(UPDATE_TYPE, INSERT_TYPE, DELETE_TYPE, 
USE_ATTR_TYPE)
    +            .build();
    +
    +    static final PropertyDescriptor DBCP_SERVICE = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-dcbp-service")
    +            .displayName("Database Connection Pooling Service")
    +            .description("The Controller Service that is used to obtain a 
connection to the database for sending records.")
    +            .required(true)
    +            .identifiesControllerService(DBCPService.class)
    +            .build();
    +
    +    static final PropertyDescriptor CATALOG_NAME = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-catalog-name")
    +            .displayName("Catalog Name")
    +            .description("The name of the catalog that the statement 
should update. This may not apply for the database that you are updating. In 
this case, leave the field empty")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor SCHEMA_NAME = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-schema-name")
    +            .displayName("Schema Name")
    +            .description("The name of the schema that the table belongs 
to. This may not apply for the database that you are updating. In this case, 
leave the field empty")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the table that the statement should 
affect.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-translate-field-names")
    +            .displayName("Translate Field Names")
    +            .description("If true, the Processor will attempt to translate 
field names into the appropriate column names for the table specified. "
    +                    + "If false, the field names must match the column 
names exactly, or the column will not be updated")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-unmatched-field-behavior")
    +            .displayName("Unmatched Field Behavior")
    +            .description("If an incoming record has a field that does not 
map to any of the database table's columns, this property specifies how to 
handle the situation")
    +            .allowableValues(IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD)
    +            .defaultValue(IGNORE_UNMATCHED_FIELD.getValue())
    +            .build();
    +
    +    static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-unmatched-column-behavior")
    +            .displayName("Unmatched Column Behavior")
    +            .description("If an incoming record does not have a field 
mapping for all of the database table's columns, this property specifies how to 
handle the situation")
    +            .allowableValues(IGNORE_UNMATCHED_COLUMN, 
WARNING_UNMATCHED_COLUMN, FAIL_UNMATCHED_COLUMN)
    +            .defaultValue(FAIL_UNMATCHED_COLUMN.getValue())
    +            .build();
    +
    +    static final PropertyDescriptor UPDATE_KEYS = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-update-keys")
    +            .displayName("Update Keys")
    +            .description("A comma-separated list of column names that 
uniquely identifies a row in the database for UPDATE statements. "
    +                    + "If the Statement Type is UPDATE and this property 
is not set, the table's Primary Keys are used. "
    +                    + "In this case, if no Primary Key exists, the 
conversion to SQL will fail if Unmatched Column Behaviour is set to FAIL. "
    +                    + "This property is ignored if the Statement Type is 
INSERT")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    static final PropertyDescriptor FIELD_CONTAINING_SQL = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-field-containing-sql")
    +            .displayName("Field Containing SQL")
    +            .description("If the Statement Type is 'SQL' (as set in the 
statement.type attribute), this field indicates which field in the record(s) 
contains the SQL statement to execute. The value "
    +                    + "of the field must be a single SQL statement. If the 
Statement Type is not 'SQL', this field is ignored.")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    static final PropertyDescriptor QUOTED_IDENTIFIERS = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-quoted-identifiers")
    +            .displayName("Quote Column Identifiers")
    +            .description("Enabling this option will cause all column names 
to be quoted, allowing you to use reserved words as column names in your 
tables.")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +    static final PropertyDescriptor QUOTED_TABLE_IDENTIFIER = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-quoted-table-identifiers")
    +            .displayName("Quote Table Identifiers")
    +            .description("Enabling this option will cause the table name 
to be quoted to support the use of special characters in the table name.")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +    static final PropertyDescriptor QUERY_TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-query-timeout")
    +            .displayName("Max Wait Time")
    +            .description("The maximum amount of time allowed for a running 
SQL statement "
    +                    + ", zero means there is no limit. Max time less than 
1 second will be equal to zero.")
    +            .defaultValue("0 seconds")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-batch-size")
    +            .displayName("Batch Size")
    +            .description("The preferred number of FlowFiles to put to the 
database in a single transaction")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    protected static List<PropertyDescriptor> propDescriptors;
    +
    +    private final Map<SchemaKey, TableSchema> schemaCache = new 
LinkedHashMap<SchemaKey, TableSchema>(100) {
    +        private static final long serialVersionUID = 1L;
    +
    +        @Override
    +        protected boolean removeEldestEntry(Map.Entry<SchemaKey, 
TableSchema> eldest) {
    +            return size() >= 100;
    +        }
    +    };
    +
    +
    +    static {
    +        final Set<Relationship> r = new HashSet<>();
    +        r.add(REL_SUCCESS);
    +        r.add(REL_FAILURE);
    +        r.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(r);
    +
    +        final List<PropertyDescriptor> pds = new ArrayList<>();
    +        pds.add(RECORD_READER_FACTORY);
    +        pds.add(STATEMENT_TYPE);
    +        pds.add(DBCP_SERVICE);
    +        pds.add(CATALOG_NAME);
    +        pds.add(SCHEMA_NAME);
    +        pds.add(TABLE_NAME);
    +        pds.add(TRANSLATE_FIELD_NAMES);
    +        pds.add(UNMATCHED_FIELD_BEHAVIOR);
    +        pds.add(UNMATCHED_COLUMN_BEHAVIOR);
    +        pds.add(UPDATE_KEYS);
    +        pds.add(FIELD_CONTAINING_SQL);
    +        pds.add(QUOTED_IDENTIFIERS);
    +        pds.add(QUOTED_TABLE_IDENTIFIER);
    +        pds.add(QUERY_TIMEOUT);
    +        pds.add(BATCH_SIZE);
    +
    +        propDescriptors = Collections.unmodifiableList(pds);
    +    }
    +
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propDescriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .required(false)
    +                
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
    +                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
    +                .expressionLanguageSupported(true)
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        synchronized (this) {
    +            schemaCache.clear();
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final ComponentLog log = getLogger();
    +
    +        final RowRecordReaderFactory recordParserFactory = 
context.getProperty(RECORD_READER_FACTORY)
    +                .asControllerService(RowRecordReaderFactory.class);
    +        final String statementTypeProperty = 
context.getProperty(STATEMENT_TYPE).getValue();
    +        final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
    +        final boolean translateFieldNames = 
context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
    +        final boolean ignoreUnmappedFields = 
IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
    +        final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
    +
    +        // Is the unmatched column behaviour fail or warning?
    +        final boolean failUnmappedColumns = 
FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
    +        final boolean warningUnmappedColumns = 
WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
    +
    +        // Escape column names?
    +        final boolean escapeColumnNames = 
context.getProperty(QUOTED_IDENTIFIERS).asBoolean();
    +
    +        // Quote table name?
    +        final boolean quoteTableName = 
context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean();
    +
    +        try (final Connection con = dbcpService.getConnection()) {
    +
    +            String jdbcURL = "DBCPService";
    +            try {
    +                DatabaseMetaData databaseMetaData = con.getMetaData();
    +                if (databaseMetaData != null) {
    +                    jdbcURL = databaseMetaData.getURL();
    +                }
    +            } catch (SQLException se) {
    +                // Ignore and use default JDBC URL. This shouldn't happen 
unless the driver doesn't implement getMetaData() properly
    +            }
    +
    +            final String catalog = 
context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +            final String schemaName = 
context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +            final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +            final String updateKeys = 
context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
    +            final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
    +
    +            // Get the statement type from the attribute if necessary
    +            String statementType = statementTypeProperty;
    +            if (USE_ATTR_TYPE.equals(statementTypeProperty)) {
    +                statementType = 
flowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
    +            }
    +            if (StringUtils.isEmpty(statementType)) {
    +                log.error("Statement Type is not specified, flowfile {} 
will be penalized and routed to failure", new Object[]{flowFile});
    +                flowFile = session.penalize(flowFile);
    +                session.transfer(flowFile, REL_FAILURE);
    +            } else {
    +                RecordSchema recordSchema;
    +                try (final InputStream in = session.read(flowFile)) {
    +
    +                    final RecordReader recordParser = 
recordParserFactory.createRecordReader(flowFile, in, log);
    +                    recordSchema = recordParser.getSchema();
    +
    +                    if (SQL_TYPE.equalsIgnoreCase(statementType)) {
    +
    +                        // Find which field has the SQL statement in it
    +                        final String sqlField = 
context.getProperty(FIELD_CONTAINING_SQL).evaluateAttributeExpressions(flowFile).getValue();
    +                        if (StringUtils.isEmpty(sqlField)) {
    +                            log.error("SQL specified as Statement Type but 
no Field Containing SQL was found, flowfile {} will be penalized and routed to 
failure", new Object[]{flowFile});
    +                            flowFile = session.penalize(flowFile);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                        } else {
    +                            boolean schemaHasSqlField = 
recordSchema.getFields().stream().anyMatch((field) -> 
sqlField.equals(field.getFieldName()));
    +                            if (schemaHasSqlField) {
    +                                try (Statement s = con.createStatement()) {
    +
    +                                    try {
    +                                        s.setQueryTimeout(queryTimeout); 
// timeout in seconds
    +                                    } catch (SQLException se) {
    +                                        // If the driver doesn't support 
query timeout, then assume it is "infinite". Allow a timeout of zero only
    +                                        if (queryTimeout > 0) {
    +                                            throw se;
    +                                        }
    +                                    }
    +
    +                                    Record currentRecord;
    +                                    while ((currentRecord = 
recordParser.nextRecord()) != null) {
    +                                        Object sql = 
currentRecord.getValue(sqlField);
    +                                        if (sql != null && 
!StringUtils.isEmpty((String) sql)) {
    +                                            // Execute the statement as-is
    +                                            s.execute((String) sql);
    +                                        } else {
    +                                            log.error("Record had no (or 
null) value for Field Containing SQL: {}, flowfile {} will be penalized and 
routed to failure",
    +                                                    new Object[]{sqlField, 
flowFile});
    +                                            flowFile = 
session.penalize(flowFile);
    +                                            session.transfer(flowFile, 
REL_FAILURE);
    +                                        }
    +                                    }
    +                                    session.transfer(flowFile, 
REL_SUCCESS);
    +                                    
session.getProvenanceReporter().send(flowFile, jdbcURL);
    +                                } catch (final SQLException e) {
    +                                    log.error("Unable to update database 
due to {}, flowfile {} will be penalized and routed to failure", new 
Object[]{e.getMessage(), flowFile}, e);
    +                                    flowFile = session.penalize(flowFile);
    +                                    session.transfer(flowFile, 
REL_FAILURE);
    +                                }
    +                            } else {
    +                                log.warn("Record schema does not contain 
Field Containing SQL: {}, flowfile {} will be penalized and routed to failure", 
new Object[]{sqlField, flowFile});
    +                                flowFile = session.penalize(flowFile);
    +                                session.transfer(flowFile, REL_FAILURE);
    +                            }
    +                        }
    +
    +                    } else {
    +                        // Ensure the table name has been set, the 
generated SQL statements (and TableSchema cache) will need it
    +                        if(StringUtils.isEmpty(tableName)) {
    +                            log.error("Cannot process {} because Table 
Name is null or empty; penalizing and routing to failure", new 
Object[]{flowFile});
    +                            flowFile = session.penalize(flowFile);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                            return;
    +                        }
    +
    +                        final boolean includePrimaryKeys = 
UPDATE_TYPE.equals(statementType) && updateKeys == null;
    --- End diff --
    
    Agree, another copy-paste error on my part, will change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to