This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 214bd2f00c NIFI-7862: Add UpdateDatabaseTable processor for auto table 
structure updates
214bd2f00c is described below

commit 214bd2f00c78b5b7b35b46ac82defa3692752ce2
Author: Matthew Burgess <[email protected]>
AuthorDate: Mon Nov 15 17:55:57 2021 -0500

    NIFI-7862: Add UpdateDatabaseTable processor for auto table structure 
updates
    
    Signed-off-by: Joe Gresock <[email protected]>
    
    This closes #5523.
---
 .../processors/standard/PutDatabaseRecord.java     | 213 +------
 .../processors/standard/UpdateDatabaseTable.java   | 635 +++++++++++++++++++++
 .../processors/standard/db/ColumnDescription.java  |  95 +++
 .../processors/standard/db/DatabaseAdapter.java    |  94 ++-
 .../standard/db/TableNotFoundException.java        |  28 +
 .../nifi/processors/standard/db/TableSchema.java   | 139 +++++
 .../standard/db/impl/MSSQLDatabaseAdapter.java     |  29 +
 .../standard/db/impl/MySQLDatabaseAdapter.java     |  75 +++
 .../standard/db/impl/Oracle12DatabaseAdapter.java  |  64 +++
 .../standard/db/impl/OracleDatabaseAdapter.java    |  67 +++
 .../standard/db/impl/PhoenixDatabaseAdapter.java   |  68 +++
 .../db/impl/PostgreSQLDatabaseAdapter.java         |  94 ++-
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../processors/standard/PutDatabaseRecordTest.java |  55 +-
 .../standard/TestUpdateDatabaseTable.java          | 388 +++++++++++++
 .../standard/db/impl/DerbyDatabaseAdapter.java     | 103 ++++
 16 files changed, 1904 insertions(+), 244 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 8c5649b921..5e94200b44 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -43,7 +43,9 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import org.apache.nifi.processors.standard.db.ColumnDescription;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.TableSchema;
 import org.apache.nifi.record.path.FieldValue;
 import org.apache.nifi.record.path.RecordPath;
 import org.apache.nifi.record.path.RecordPathResult;
@@ -68,8 +70,6 @@ import java.sql.Clob;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLDataException;
 import java.sql.SQLException;
 import java.sql.SQLIntegrityConstraintViolationException;
@@ -81,7 +81,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -723,7 +722,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                         final DataType dataType = 
dataTypes.get(currentFieldIndex);
                         final int fieldSqlType = 
DataTypeUtils.getSQLTypeValue(dataType);
                         final String fieldName = 
recordSchema.getField(currentFieldIndex).getFieldName();
-                        String columnName = normalizeColumnName(fieldName, 
settings.translateFieldNames);
+                        String columnName = 
ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames);
                         int sqlType;
 
                         final ColumnDescription column = 
columns.get(columnName);
@@ -736,7 +735,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                                 sqlType = fieldSqlType;
                             }
                         } else {
-                            sqlType = column.dataType;
+                            sqlType = column.getDataType();
                         }
 
                         // Convert (if necessary) from field data type to 
column data type
@@ -780,7 +779,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                         // If DELETE type, insert the object twice if the 
column is nullable because of the null check (see generateDelete for details)
                         if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
                             setParameter(ps, ++deleteIndex, currentValue, 
fieldSqlType, sqlType);
-                            if (column.isNullable()) {
+                            if (column != null && column.isNullable()) {
                                 setParameter(ps, ++deleteIndex, currentValue, 
fieldSqlType, sqlType);
                             }
                         } else if 
(UPSERT_TYPE.equalsIgnoreCase(statementType)) {
@@ -1005,7 +1004,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
     private Set<String> getNormalizedColumnNames(final RecordSchema schema, 
final boolean translateFieldNames) {
         final Set<String> normalizedFieldNames = new HashSet<>();
         if (schema != null) {
-            schema.getFieldNames().forEach((fieldName) -> 
normalizedFieldNames.add(normalizeColumnName(fieldName, translateFieldNames)));
+            schema.getFieldNames().forEach((fieldName) -> 
normalizedFieldNames.add(ColumnDescription.normalizeColumnName(fieldName, 
translateFieldNames)));
         }
         return normalizedFieldNames;
     }
@@ -1031,7 +1030,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
 
-                final ColumnDescription desc = 
tableSchema.getColumns().get(normalizeColumnName(fieldName, 
settings.translateFieldNames));
+                final ColumnDescription desc = 
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, 
settings.translateFieldNames));
                 if (desc == null && !settings.ignoreUnmappedFields) {
                     throw new SQLDataException("Cannot map field '" + 
fieldName + "' to any column in the database\n"
                             + (settings.translateFieldNames ? "Normalized " : 
"") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@@ -1090,7 +1089,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
 
-                final ColumnDescription desc = 
tableSchema.getColumns().get(normalizeColumnName(fieldName, 
settings.translateFieldNames));
+                final ColumnDescription desc = 
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, 
settings.translateFieldNames));
                 if (desc == null && !settings.ignoreUnmappedFields) {
                     throw new SQLDataException("Cannot map field '" + 
fieldName + "' to any column in the database\n"
                             + (settings.translateFieldNames ? "Normalized " : 
"") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@@ -1136,7 +1135,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
 
-                final ColumnDescription desc = 
tableSchema.getColumns().get(normalizeColumnName(fieldName, 
settings.translateFieldNames));
+                final ColumnDescription desc = 
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, 
settings.translateFieldNames));
                 if (desc == null && !settings.ignoreUnmappedFields) {
                     throw new SQLDataException("Cannot map field '" + 
fieldName + "' to any column in the database\n"
                             + (settings.translateFieldNames ? "Normalized " : 
"") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@@ -1166,7 +1165,6 @@ public class PutDatabaseRecord extends AbstractProcessor {
                                          final TableSchema tableSchema, final 
DMLSettings settings)
             throws IllegalArgumentException, MalformedRecordException, 
SQLException {
 
-
         final Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, 
updateKeys, tableSchema);
         final Set<String> normalizedKeyColumnNames = 
normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, 
keyColumnNames, tableSchema.getQuotedIdentifierString());
 
@@ -1174,7 +1172,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
         sqlBuilder.append("UPDATE ");
         sqlBuilder.append(tableName);
 
-        // iterate over all of the fields in the record, building the SQL 
statement by adding the column names
+        // iterate over all the fields in the record, building the SQL 
statement by adding the column names
         List<String> fieldNames = recordSchema.getFieldNames();
         final List<Integer> includedColumns = new ArrayList<>();
         if (fieldNames != null) {
@@ -1187,8 +1185,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
 
-                final String normalizedColName = 
normalizeColumnName(fieldName, settings.translateFieldNames);
-                final ColumnDescription desc = 
tableSchema.getColumns().get(normalizeColumnName(fieldName, 
settings.translateFieldNames));
+                final String normalizedColName = 
ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames);
+                final ColumnDescription desc = 
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, 
settings.translateFieldNames));
                 if (desc == null) {
                     if (!settings.ignoreUnmappedFields) {
                         throw new SQLDataException("Cannot map field '" + 
fieldName + "' to any column in the database\n"
@@ -1227,8 +1225,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 String fieldName = field.getFieldName();
                 boolean firstUpdateKey = true;
 
-                final String normalizedColName = 
normalizeColumnName(fieldName, settings.translateFieldNames);
-                final ColumnDescription desc = 
tableSchema.getColumns().get(normalizeColumnName(fieldName, 
settings.translateFieldNames));
+                final String normalizedColName = 
ColumnDescription.normalizeColumnName(fieldName, settings.translateFieldNames);
+                final ColumnDescription desc = 
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, 
settings.translateFieldNames));
                 if (desc != null) {
 
                     // Check if this column is a Update Key. If so, add it to 
the WHERE clause
@@ -1263,7 +1261,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
         final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
         for (final String requiredColName : 
tableSchema.getRequiredColumnNames()) {
-            final String normalizedColName = 
normalizeColumnName(requiredColName, settings.translateFieldNames);
+            final String normalizedColName = 
ColumnDescription.normalizeColumnName(requiredColName, 
settings.translateFieldNames);
             if (!normalizedFieldNames.contains(normalizedColName)) {
                 String missingColMessage = "Record does not have a value for 
the Required column '" + requiredColName + "'";
                 if (settings.failUnmappedColumns) {
@@ -1292,7 +1290,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
 
-                final ColumnDescription desc = 
tableSchema.getColumns().get(normalizeColumnName(fieldName, 
settings.translateFieldNames));
+                final ColumnDescription desc = 
tableSchema.getColumns().get(ColumnDescription.normalizeColumnName(fieldName, 
settings.translateFieldNames));
                 if (desc == null && !settings.ignoreUnmappedFields) {
                     throw new SQLDataException("Cannot map field '" + 
fieldName + "' to any column in the database\n"
                             + (settings.translateFieldNames ? "Normalized " : 
"") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
@@ -1345,7 +1343,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
         final Set<String> normalizedFieldNames = 
getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
 
         for (final String requiredColName : 
tableSchema.getRequiredColumnNames()) {
-            final String normalizedColName = 
normalizeColumnName(requiredColName, settings.translateFieldNames);
+            final String normalizedColName = 
ColumnDescription.normalizeColumnName(requiredColName, 
settings.translateFieldNames);
             if (!normalizedFieldNames.contains(normalizedColName)) {
                 String missingColMessage = "Record does not have a value for 
the Required column '" + requiredColName + "'";
                 if (settings.failUnmappedColumns) {
@@ -1385,7 +1383,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
         final Set<String> normalizedKeyColumnNames = new HashSet<>();
         for (final String updateKeyColumnName : updateKeyColumnNames) {
-            String normalizedKeyColumnName = 
normalizeColumnName(updateKeyColumnName, settings.translateFieldNames);
+            String normalizedKeyColumnName = 
ColumnDescription.normalizeColumnName(updateKeyColumnName, 
settings.translateFieldNames);
 
             if (!normalizedRecordFieldNames.contains(normalizedKeyColumnName)) 
{
                 String missingColMessage = "Record does not have a value for 
the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + 
updateKeyColumnName + "'";
@@ -1402,181 +1400,6 @@ public class PutDatabaseRecord extends 
AbstractProcessor {
         return normalizedKeyColumnNames;
     }
 
-    private static String normalizeColumnName(final String colName, final 
boolean translateColumnNames) {
-        return colName == null ? null : (translateColumnNames ? 
colName.toUpperCase().replace("_", "") : colName);
-    }
-
-    static class TableSchema {
-        private List<String> requiredColumnNames;
-        private Set<String> primaryKeyColumnNames;
-        private Map<String, ColumnDescription> columns;
-        private String quotedIdentifierString;
-
-        TableSchema(final List<ColumnDescription> columnDescriptions, final 
boolean translateColumnNames,
-                            final Set<String> primaryKeyColumnNames, final 
String quotedIdentifierString) {
-            this.columns = new LinkedHashMap<>();
-            this.primaryKeyColumnNames = primaryKeyColumnNames;
-            this.quotedIdentifierString = quotedIdentifierString;
-
-            this.requiredColumnNames = new ArrayList<>();
-            for (final ColumnDescription desc : columnDescriptions) {
-                columns.put(normalizeColumnName(desc.columnName, 
translateColumnNames), desc);
-                if (desc.isRequired()) {
-                    requiredColumnNames.add(desc.columnName);
-                }
-            }
-        }
-
-        public Map<String, ColumnDescription> getColumns() {
-            return columns;
-        }
-
-        public List<ColumnDescription> getColumnsAsList() {
-            return new ArrayList<>(columns.values());
-        }
-
-        public List<String> getRequiredColumnNames() {
-            return requiredColumnNames;
-        }
-
-        public Set<String> getPrimaryKeyColumnNames() {
-            return primaryKeyColumnNames;
-        }
-
-        public String getQuotedIdentifierString() {
-            return quotedIdentifierString;
-        }
-
-        public static TableSchema from(final Connection conn, final String 
catalog, final String schema, final String tableName,
-                                       final boolean translateColumnNames, 
final String updateKeys, ComponentLog log) throws SQLException {
-            final DatabaseMetaData dmd = conn.getMetaData();
-
-            try (final ResultSet colrs = dmd.getColumns(catalog, schema, 
tableName, "%")) {
-                final List<ColumnDescription> cols = new ArrayList<>();
-                while (colrs.next()) {
-                    final ColumnDescription col = 
ColumnDescription.from(colrs);
-                    cols.add(col);
-                }
-                // If no columns are found, check that the table exists
-                if (cols.isEmpty()) {
-                    try (final ResultSet tblrs = dmd.getTables(catalog, 
schema, tableName, null)) {
-                        List<String> qualifiedNameSegments = new ArrayList<>();
-                        if (catalog != null) {
-                            qualifiedNameSegments.add(catalog);
-                        }
-                        if (schema != null) {
-                            qualifiedNameSegments.add(schema);
-                        }
-                        if (tableName != null) {
-                            qualifiedNameSegments.add(tableName);
-                        }
-                        if (!tblrs.next()) {
-
-                            throw new SQLException("Table "
-                                    + String.join(".", qualifiedNameSegments)
-                                    + " not found, ensure the Catalog, Schema, 
and/or Table Names match those in the database exactly");
-                        } else {
-                            log.warn("Table "
-                                    + String.join(".", qualifiedNameSegments)
-                                    + " found but no columns were found, if 
this is not expected then check the user permissions for getting table metadata 
from the database");
-                        }
-                    }
-                }
-
-                final Set<String> primaryKeyColumns = new HashSet<>();
-                if (updateKeys == null) {
-                    try (final ResultSet pkrs = dmd.getPrimaryKeys(catalog, 
schema, tableName)) {
-
-                        while (pkrs.next()) {
-                            final String colName = 
pkrs.getString("COLUMN_NAME");
-                            primaryKeyColumns.add(normalizeColumnName(colName, 
translateColumnNames));
-                        }
-                    }
-                } else {
-                    // Parse the Update Keys field and normalize the column 
names
-                    for (final String updateKey : updateKeys.split(",")) {
-                        
primaryKeyColumns.add(normalizeColumnName(updateKey.trim(), 
translateColumnNames));
-                    }
-                }
-
-                return new TableSchema(cols, translateColumnNames, 
primaryKeyColumns, dmd.getIdentifierQuoteString());
-            }
-        }
-
-        @Override
-        public String toString() {
-            return "TableSchema[columns=" + columns.values() + "]";
-        }
-    }
-
-    protected static class ColumnDescription {
-        private final String columnName;
-        private final int dataType;
-        private final boolean required;
-        private final Integer columnSize;
-        private final boolean nullable;
-
-        public ColumnDescription(final String columnName, final int dataType, 
final boolean required, final Integer columnSize, final boolean nullable) {
-            this.columnName = columnName;
-            this.dataType = dataType;
-            this.required = required;
-            this.columnSize = columnSize;
-            this.nullable = nullable;
-        }
-
-        public int getDataType() {
-            return dataType;
-        }
-
-        public Integer getColumnSize() {
-            return columnSize;
-        }
-
-        public String getColumnName() {
-            return columnName;
-        }
-
-        public boolean isRequired() {
-            return required;
-        }
-
-        public boolean isNullable() {
-            return nullable;
-        }
-
-        public static ColumnDescription from(final ResultSet resultSet) throws 
SQLException {
-            final ResultSetMetaData md = resultSet.getMetaData();
-            List<String> columns = new ArrayList<>();
-
-            for (int i = 1; i < md.getColumnCount() + 1; i++) {
-                columns.add(md.getColumnName(i));
-            }
-            // COLUMN_DEF must be read first to work around Oracle bug, see 
NIFI-4279 for details
-            final String defaultValue = resultSet.getString("COLUMN_DEF");
-            final String columnName = resultSet.getString("COLUMN_NAME");
-            final int dataType = resultSet.getInt("DATA_TYPE");
-            final int colSize = resultSet.getInt("COLUMN_SIZE");
-
-            final String nullableValue = resultSet.getString("IS_NULLABLE");
-            final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) 
|| nullableValue.isEmpty();
-            String autoIncrementValue = "NO";
-
-            if (columns.contains("IS_AUTOINCREMENT")) {
-                autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT");
-            }
-
-            final boolean isAutoIncrement = 
"YES".equalsIgnoreCase(autoIncrementValue);
-            final boolean required = !isNullable && !isAutoIncrement && 
defaultValue == null;
-
-            return new ColumnDescription(columnName, dataType, required, 
colSize == 0 ? null : colSize, isNullable);
-        }
-
-        @Override
-        public String toString() {
-            return "Column[name=" + columnName + ", dataType=" + dataType + ", 
required=" + required + ", columnSize=" + columnSize + "]";
-        }
-    }
-
     static class SchemaKey {
         private final String catalog;
         private final String schemaName;
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
new file mode 100644
index 0000000000..8261bc86ec
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateDatabaseTable.java
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.DiscontinuedException;
+import org.apache.nifi.processors.standard.db.ColumnDescription;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.TableNotFoundException;
+import org.apache.nifi.processors.standard.db.TableSchema;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+
+@Tags({"metadata", "jdbc", "database", "table", "update", "alter"})
+@CapabilityDescription("This processor uses a JDBC connection and incoming 
records to generate any database table changes needed to support the incoming 
records. It expects a 'flat' record layout, "
+        + "meaning none of the top-level record fields has nested fields that 
are intended to become columns themselves.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "output.table", description = "This 
attribute is written on the flow files routed to the 'success' "
+                + "and 'failure' relationships, and contains the target table 
name."),
+        @WritesAttribute(attribute = "output.path", description = "This 
attribute is written on the flow files routed to the 'success' "
+                + "and 'failure' relationships, and contains the path on the 
file system to the table (or partition location if the table is partitioned)."),
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer, only if a 
Record Writer is specified "
+                + "and Update Field Names is 'true'."),
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile, only if a Record Writer is specified and 
Update Field Names is 'true'.")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class UpdateDatabaseTable extends AbstractProcessor {
+
+    static final AllowableValue CREATE_IF_NOT_EXISTS = new 
AllowableValue("Create If Not Exists", "Create If Not Exists",
+            "Create a table with the given schema if it does not already 
exist");
+    static final AllowableValue FAIL_IF_NOT_EXISTS = new AllowableValue("Fail 
If Not Exists", "Fail If Not Exists",
+            "If the target does not already exist, log an error and route the 
flowfile to failure");
+
+    static final String ATTR_OUTPUT_TABLE = "output.table";
+
+    // Properties
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The service for reading incoming flow files. The 
reader is only used to determine the schema of the records, the actual records 
will not be processed.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor DBCP_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("updatedatabasetable-dbcp-service")
+            .displayName("Database Connection Pooling Service")
+            .description("The Controller Service that is used to obtain 
connection(s) to the database")
+            .required(true)
+            .identifiesControllerService(DBCPService.class)
+            .build();
+
+    static final PropertyDescriptor CATALOG_NAME = new 
PropertyDescriptor.Builder()
+            .name("updatedatabasetable-catalog-name")
+            .displayName("Catalog Name")
+            .description("The name of the catalog that the statement should 
update. This may not apply for the database that you are updating. In this 
case, leave the field empty. Note that if the "
+                    + "property is set and the database is case-sensitive, the 
catalog name must match the database's catalog name exactly.")
+            .required(false)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor SCHEMA_NAME = new 
PropertyDescriptor.Builder()
+            .name("updatedatabasetable-schema-name")
+            .displayName("Schema Name")
+            .description("The name of the database schema that the table 
belongs to. This may not apply for the database that you are updating. In this 
case, leave the field empty. Note that if the "
+                    + "property is set and the database is case-sensitive, the 
schema name must match the database's schema name exactly.")
+            .required(false)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("updatedatabasetable-table-name")
+            .displayName("Table Name")
+            .description("The name of the database table to update. If the 
table does not exist, then it will either be created or an error thrown, 
depending "
+                    + "on the value of the Create Table property.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CREATE_TABLE = new 
PropertyDescriptor.Builder()
+            .name("updatedatabasetable-create-table")
+            .displayName("Create Table Strategy")
+            .description("Specifies how to process the target table when it 
does not exist (create it, fail, e.g.).")
+            .required(true)
+            .addValidator(Validator.VALID)
+            .allowableValues(CREATE_IF_NOT_EXISTS, FAIL_IF_NOT_EXISTS)
+            .defaultValue(FAIL_IF_NOT_EXISTS.getValue())
+            .build();
+
+    static final PropertyDescriptor PRIMARY_KEY_FIELDS = new 
PropertyDescriptor.Builder()
+            .name("updatedatabasetable-primary-keys")
+            .displayName("Primary Key Fields")
+            .description("A comma-separated list of record field names that 
uniquely identifies a row in the database. This property is only used if the 
specified table needs to be created, "
+                    + "in which case the Primary Key Fields will be used to 
specify the primary keys of the newly-created table. IMPORTANT: Primary Key 
Fields must match the record field "
+                    + "names exactly unless 'Quote Column Identifiers' is 
false and the database allows for case-insensitive column names. In practice it 
is best to specify Primary Key Fields "
+                    + "that exactly match the record field names, and those 
will become the column names in the created table.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+            .dependsOn(CREATE_TABLE, CREATE_IF_NOT_EXISTS)
+            .build();
+
+    static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new 
PropertyDescriptor.Builder()
+            .name("updatedatabasetable-translate-field-names")
+            .displayName("Translate Field Names")
+            .description("If true, the Processor will attempt to translate 
field names into the corresponding column names for the table specified, for 
the purposes of determining whether "
+                    + "the field name exists as a column in the target table. 
NOTE: If the target table does not exist and is to be created, this property is 
ignored and the field names will be "
+                    + "used as-is. If false, the field names must match the 
column names exactly, or the column may not be found and instead an error my be 
reported that the column already exists.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    static final PropertyDescriptor UPDATE_FIELD_NAMES = new 
PropertyDescriptor.Builder()
+            .name("updatedatabasetable-update-field-names")
+            .displayName("Update Field Names")
+            .description("This property indicates whether to update the output 
schema such that the field names are set to the exact column names from the 
specified "
+                    + "table. This should be used if the incoming record field 
names may not match the table's column names in terms of upper- and lower-case. 
For example, this property should be "
+                    + "set to true if the output FlowFile is destined for 
Oracle e.g., which expects the field names to match the column names exactly. 
NOTE: The value of the "
+                    + "'Translate Field Names' property is ignored when 
updating field names; instead they are updated to match the column name as 
returned by the database.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("updatedatabasetable-record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Controller Service to use for writing 
results to a FlowFile. The Record Writer should use Inherit Schema to emulate 
the inferred schema behavior, i.e. "
+                    + "an explicit schema need not be defined in the writer, 
and will be supplied by the same logic used to infer the schema from the column 
types. If Create Table Strategy is set "
+                    + "'Create If Not Exists', the Record Writer's output 
format must match the Record Reader's format in order for the data to be placed 
in the created table location. Note that "
+                    + "this property is only used if 'Update Field Names' is 
set to true and the field names do not all match the column names exactly. If 
no "
+                    + "update is needed for any field names (or 'Update Field 
Names' is false), the Record Writer is not used and instead the input FlowFile 
is routed to success or failure "
+                    + "without modification.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .dependsOn(UPDATE_FIELD_NAMES, "true")
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor QUOTE_COLUMN_IDENTIFIERS = new 
PropertyDescriptor.Builder()
+            .name("updatedatabasetable-quoted-column-identifiers")
+            .displayName("Quote Column Identifiers")
+            .description("Enabling this option will cause all column names to 
be quoted, allowing you to use reserved words as column names in your tables 
and/or forcing the "
+                    + "record field names to match the column names exactly.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor QUOTE_TABLE_IDENTIFIER = new 
PropertyDescriptor.Builder()
+            .name("updatedatabasetable-quoted-table-identifiers")
+            .displayName("Quote Table Identifiers")
+            .description("Enabling this option will cause the table name to be 
quoted to support the use of special characters in the table name and/or 
forcing the "
+                    + "value of the Table Name property to match the target 
table name exactly.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor QUERY_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("updatedatabasetable-query-timeout")
+            .displayName("Query Timeout")
+            .description("Sets the number of seconds the driver will wait for 
a query to execute. "
+                    + "A value of 0 means no timeout. NOTE: Non-zero values 
may not be supported by the driver.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    static final PropertyDescriptor DB_TYPE;
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile containing records routed to this 
relationship after the record has been successfully transmitted to the 
database.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile containing records routed to this 
relationship if the record could not be transmitted to the database.")
+            .build();
+
+    protected static final Map<String, DatabaseAdapter> dbAdapters;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+    protected static Set<Relationship> relationships;
+
+    static {
+        dbAdapters = new HashMap<>();
+        ArrayList<AllowableValue> dbAdapterValues = new ArrayList<>();
+
+        ServiceLoader<DatabaseAdapter> dbAdapterLoader = 
ServiceLoader.load(DatabaseAdapter.class);
+        dbAdapterLoader.forEach(databaseAdapter -> {
+            dbAdapters.put(databaseAdapter.getName(), databaseAdapter);
+            dbAdapterValues.add(new AllowableValue(databaseAdapter.getName(), 
databaseAdapter.getName(), databaseAdapter.getDescription()));
+        });
+
+        DB_TYPE = new PropertyDescriptor.Builder()
+                .name("db-type")
+                .displayName("Database Type")
+                .description("The type/flavor of database, used for generating 
database-specific code. In many cases the Generic type "
+                        + "should suffice, but some databases (such as Oracle) 
require custom SQL clauses.")
+                .allowableValues(dbAdapterValues.toArray(new 
AllowableValue[0]))
+                .defaultValue("Generic")
+                .required(false)
+                .build();
+
+        final Set<Relationship> r = new HashSet<>();
+        r.add(REL_SUCCESS);
+        r.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(r);
+
+        final List<PropertyDescriptor> pds = new ArrayList<>();
+        pds.add(RECORD_READER);
+        pds.add(DBCP_SERVICE);
+        pds.add(DB_TYPE);
+        pds.add(CATALOG_NAME);
+        pds.add(SCHEMA_NAME);
+        pds.add(TABLE_NAME);
+        pds.add(CREATE_TABLE);
+        pds.add(PRIMARY_KEY_FIELDS);
+        pds.add(TRANSLATE_FIELD_NAMES);
+        pds.add(UPDATE_FIELD_NAMES);
+        pds.add(RECORD_WRITER_FACTORY);
+        pds.add(QUOTE_TABLE_IDENTIFIER);
+        pds.add(QUOTE_COLUMN_IDENTIFIERS);
+        pds.add(QUERY_TIMEOUT);
+        propertyDescriptors = Collections.unmodifiableList(pds);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final List<ValidationResult> validationResults = new 
ArrayList<>(super.customValidate(validationContext));
+        final boolean recordWriterFactorySet = 
validationContext.getProperty(RECORD_WRITER_FACTORY).isSet();
+        final boolean updateFieldNames = 
validationContext.getProperty(UPDATE_FIELD_NAMES).asBoolean();
+
+        if (!recordWriterFactorySet && updateFieldNames) {
+            validationResults.add(new 
ValidationResult.Builder().subject(RECORD_WRITER_FACTORY.getDisplayName())
+                    .explanation("Record Writer must be set if 'Update Field 
Names' is true").valid(false).build());
+        }
+
+        final DatabaseAdapter databaseAdapter = 
dbAdapters.get(validationContext.getProperty(DB_TYPE).getValue());
+        final boolean createIfNotExists = 
CREATE_IF_NOT_EXISTS.getValue().equals(validationContext.getProperty(CREATE_TABLE).getValue());
+        if (createIfNotExists && 
!databaseAdapter.supportsCreateTableIfNotExists()) {
+            validationResults.add(new 
ValidationResult.Builder().subject(CREATE_TABLE.getDisplayName())
+                    .explanation("The specified Database Type does not support 
Create If Not Exists").valid(false).build());
+        }
+        return validationResults;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory recordWriterFactory = 
context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+        final String catalogName = 
context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String schemaName = 
context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String primaryKeyFields = 
context.getProperty(PRIMARY_KEY_FIELDS).evaluateAttributeExpressions(flowFile).getValue();
+        final ComponentLog log = getLogger();
+
+        try {
+            final RecordReader reader;
+
+            try (final InputStream in = session.read(flowFile)) {
+                // if we fail to create the RecordReader then we want to route 
to failure, so we need to
+                // handle this separately from the other IOExceptions which 
normally route to retry
+                try {
+                    reader = recordReaderFactory.createRecordReader(flowFile, 
in, getLogger());
+                } catch (Exception e) {
+                    throw new ProcessException("Unable to create 
RecordReader", e);
+                }
+            } catch (ProcessException rrfe) {
+                log.error(
+                        "Failed to create {} for {} - routing to failure",
+                        new Object[]{RecordReader.class.getSimpleName(), 
flowFile},
+                        rrfe
+                );
+                // Since we are wrapping the exceptions above there should 
always be a cause
+                // but it's possible it might not have a message. This handles 
that by logging
+                // the name of the class thrown.
+                Throwable c = rrfe.getCause();
+                if (c != null) {
+                    session.putAttribute(flowFile, "record.error.message", 
(c.getLocalizedMessage() != null) ? c.getLocalizedMessage() : 
c.getClass().getCanonicalName() + " Thrown");
+                } else {
+                    session.putAttribute(flowFile, "record.error.message", 
rrfe.getClass().getCanonicalName() + " Thrown");
+                }
+                session.transfer(flowFile, REL_FAILURE);
+                return;
+            }
+
+            final RecordSchema recordSchema = reader.getSchema();
+
+            final boolean createIfNotExists = 
context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
+            final boolean updateFieldNames = 
context.getProperty(UPDATE_FIELD_NAMES).asBoolean();
+            final boolean translateFieldNames = 
context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
+            if (recordWriterFactory == null && updateFieldNames) {
+                throw new ProcessException("Record Writer must be set if 
'Update Field Names' is true");
+            }
+            final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+            final DatabaseAdapter databaseAdapter = 
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+            try (final Connection connection = dbcpService.getConnection()) {
+                final boolean quoteTableName = 
context.getProperty(QUOTE_TABLE_IDENTIFIER).asBoolean();
+                final boolean quoteColumnNames = 
context.getProperty(QUOTE_COLUMN_IDENTIFIERS).asBoolean();
+                final Map<String, String> attributes = new 
HashMap<>(flowFile.getAttributes());
+
+                // If table may need to be created, parse the primary key 
field names and pass along
+                final Set<String> primaryKeyColumnNames;
+                if (createIfNotExists && primaryKeyFields != null) {
+                    primaryKeyColumnNames = new HashSet<>();
+                    Arrays.stream(primaryKeyFields.split(","))
+                            .filter(path -> path != null && 
!path.trim().isEmpty())
+                            .map(String::trim)
+                            .forEach(primaryKeyColumnNames::add);
+                } else {
+                    primaryKeyColumnNames = null;
+                }
+                final OutputMetadataHolder outputMetadataHolder = 
checkAndUpdateTableSchema(connection, databaseAdapter, recordSchema,
+                        catalogName, schemaName, tableName, createIfNotExists, 
translateFieldNames, updateFieldNames, primaryKeyColumnNames, quoteTableName, 
quoteColumnNames);
+                if (outputMetadataHolder != null) {
+                    // The output schema changed (i.e. field names were 
updated), so write out the corresponding FlowFile
+                    try {
+                        final FlowFile inputFlowFile = flowFile;
+                        flowFile = session.write(flowFile, (in, out) -> {
+
+                            // if we fail to create the RecordReader then we 
want to route to failure, so we need to
+                            // handle this separately from the other 
IOExceptions which normally route to retry
+                            final RecordReader recordReader;
+                            final RecordSetWriter recordSetWriter;
+                            try {
+                                recordReader = 
recordReaderFactory.createRecordReader(inputFlowFile, in, getLogger());
+                                recordSetWriter = 
recordWriterFactory.createWriter(getLogger(), 
outputMetadataHolder.getOutputSchema(), out, attributes);
+                            } catch (Exception e) {
+                                if (e instanceof IOException) {
+                                    throw (IOException) e;
+                                }
+                                throw new IOException("Unable to create 
RecordReader", e);
+                            }
+
+                            final WriteResult writeResult = 
updateRecords(recordSchema, outputMetadataHolder, recordReader, 
recordSetWriter);
+                            recordSetWriter.flush();
+                            recordSetWriter.close();
+                            attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+                            attributes.put(CoreAttributes.MIME_TYPE.key(), 
recordSetWriter.getMimeType());
+                            attributes.putAll(writeResult.getAttributes());
+                        });
+                    } catch (final Exception e) {
+                        getLogger().error("Failed to process {}; will route to 
failure", flowFile, e);
+                        // Since we are wrapping the exceptions above there 
should always be a cause
+                        // but it's possible it might not have a message. This 
handles that by logging
+                        // the name of the class thrown.
+                        final Throwable c = e.getCause();
+                        if (c != null) {
+                            session.putAttribute(flowFile, 
"record.error.message", (c.getLocalizedMessage() != null) ? 
c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown");
+                        } else {
+                            session.putAttribute(flowFile, 
"record.error.message", e.getClass().getCanonicalName() + " Thrown");
+                        }
+                        session.transfer(flowFile, REL_FAILURE);
+                        return;
+                    }
+
+                }
+                attributes.put(ATTR_OUTPUT_TABLE, tableName);
+                flowFile = session.putAllAttributes(flowFile, attributes);
+                session.getProvenanceReporter().invokeRemoteProcess(flowFile, 
getJdbcUrl(connection));
+                session.transfer(flowFile, REL_SUCCESS);
+            }
+        } catch (IOException | SQLException e) {
+            flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, 
tableName);
+            log.error("Exception while processing {} - routing to failure", 
new Object[]{flowFile}, e);
+            session.transfer(flowFile, REL_FAILURE);
+        } catch (DiscontinuedException e) {
+            // The input FlowFile processing is discontinued. Keep it in the 
input queue.
+            getLogger().warn("Discontinued processing for {} due to {}", new 
Object[]{flowFile, e}, e);
+            session.transfer(flowFile, Relationship.SELF);
+        } catch (Throwable t) {
+            throw (t instanceof ProcessException) ? (ProcessException) t : new 
ProcessException(t);
+        }
+    }
+
+    private synchronized OutputMetadataHolder checkAndUpdateTableSchema(final 
Connection conn, final DatabaseAdapter databaseAdapter, final RecordSchema 
schema,
+                                                                        final 
String catalogName, final String schemaName, final String tableName,
+                                                                        final 
boolean createIfNotExists, final boolean translateFieldNames, final boolean 
updateFieldNames,
+                                                                        final 
Set<String> primaryKeyColumnNames, final boolean quoteTableName, final boolean 
quoteColumnNames) throws IOException {
+        // Read in the current table metadata, compare it to the reader's 
schema, and
+        // add any columns from the schema that are missing in the table
+        try (final Statement s = conn.createStatement()) {
+            // Determine whether the table exists
+            TableSchema tableSchema = null;
+            try {
+                tableSchema = TableSchema.from(conn, catalogName, schemaName, 
tableName, translateFieldNames, null, getLogger());
+            } catch (TableNotFoundException tnfe) {
+                // Do nothing, the value will be populated if necessary
+            }
+
+            final List<ColumnDescription> columns = new ArrayList<>();
+            boolean tableCreated = false;
+            if (tableSchema == null) {
+                if (createIfNotExists) {
+                    // Create a TableSchema from the record, adding all columns
+                    for (RecordField recordField : schema.getFields()) {
+                        String recordFieldName = recordField.getFieldName();
+                        // Assume a column to be created is required if there 
is a default value in the schema
+                        final boolean required = 
(recordField.getDefaultValue() != null);
+                        columns.add(new ColumnDescription(recordFieldName, 
DataTypeUtils.getSQLTypeValue(recordField.getDataType()), required, null, 
recordField.isNullable()));
+                        getLogger().debug("Adding column " + recordFieldName + 
" to table " + tableName);
+                    }
+
+                    tableSchema = new TableSchema(tableName, columns, 
translateFieldNames, primaryKeyColumnNames, 
databaseAdapter.getColumnQuoteString());
+
+                    final String createTableSql = 
databaseAdapter.getCreateTableStatement(tableSchema, quoteTableName, 
quoteColumnNames);
+
+                    if (StringUtils.isNotEmpty(createTableSql)) {
+                        // Perform the table create
+                        getLogger().info("Executing DDL: " + createTableSql);
+                        s.execute(createTableSql);
+                    }
+
+                    tableCreated = true;
+                } else {
+                    // The table wasn't found and is not to be created, so 
throw an error
+                    throw new IOException("The table " + tableName + " could 
not be found in the database and the processor is configured not to create 
it.");
+                }
+            }
+
+            final List<String> dbColumns = new ArrayList<>();
+            for (final ColumnDescription columnDescription : 
tableSchema.getColumnsAsList()) {
+                
dbColumns.add(ColumnDescription.normalizeColumnName(columnDescription.getColumnName(),
 translateFieldNames));
+            }
+
+            final List<ColumnDescription> columnsToAdd = new ArrayList<>();
+            // If the table wasn't newly created, alter it accordingly
+            if (!tableCreated) {
+                // Handle new columns
+                for (RecordField recordField : schema.getFields()) {
+                    final String recordFieldName = recordField.getFieldName();
+                    final String normalizedFieldName = 
ColumnDescription.normalizeColumnName(recordFieldName, translateFieldNames);
+                    if (!dbColumns.contains(normalizedFieldName)) {
+                        // The field does not exist in the table, add it
+                        ColumnDescription columnToAdd = new 
ColumnDescription(recordFieldName, 
DataTypeUtils.getSQLTypeValue(recordField.getDataType()),
+                                recordField.getDefaultValue() != null, null, 
recordField.isNullable());
+                        columnsToAdd.add(columnToAdd);
+                        getLogger().debug("Adding column " + recordFieldName + 
" to table " + tableName);
+                    }
+                }
+
+                if (!columnsToAdd.isEmpty()) {
+                    final List<String> alterTableSqlStatements = 
databaseAdapter.getAlterTableStatements(tableName, columnsToAdd, 
quoteTableName, quoteColumnNames);
+
+                    if (alterTableSqlStatements != null && 
!alterTableSqlStatements.isEmpty()) {
+                        for (String alterTableSql : alterTableSqlStatements) {
+                            if (StringUtils.isEmpty(alterTableSql)) {
+                                continue;
+                            }
+                            // Perform the table update
+                            getLogger().info("Executing DDL: " + 
alterTableSql);
+                            s.execute(alterTableSql);
+                        }
+                    }
+                }
+            }
+
+            // If updating field names, return a new RecordSchema, otherwise 
return null
+            final OutputMetadataHolder outputMetadataHolder;
+            if (updateFieldNames) {
+                final List<RecordField> inputRecordFields = schema.getFields();
+                final List<RecordField> outputRecordFields = new ArrayList<>();
+                final Map<String, String> fieldMap = new HashMap<>();
+                boolean needsUpdating = false;
+
+                for (RecordField inputRecordField : inputRecordFields) {
+                    final String inputRecordFieldName = 
inputRecordField.getFieldName();
+                    boolean found = false;
+                    for (final String columnName : dbColumns) {
+                        if (inputRecordFieldName.equalsIgnoreCase(columnName)) 
{
+                            // Set a flag if the field name doesn't match the 
column name exactly. This overall flag will determine whether
+                            // the records need updating (if true) or not (if 
false)
+                            if (!inputRecordFieldName.equals(columnName)) {
+                                needsUpdating = true;
+                            }
+                            fieldMap.put(inputRecordFieldName, columnName);
+                            outputRecordFields.add(new RecordField(columnName, 
inputRecordField.getDataType(), inputRecordField.getDefaultValue(), 
inputRecordField.isNullable()));
+                            found = true;
+                            break;
+                        }
+                    }
+                    if (!found) {
+                        // If the input field wasn't a table column, add it 
back to the schema as-is
+                        fieldMap.put(inputRecordFieldName, 
inputRecordFieldName);
+                    }
+                }
+                outputMetadataHolder = needsUpdating ? new 
OutputMetadataHolder(new SimpleRecordSchema(outputRecordFields), fieldMap)
+                        : null;
+            } else {
+                outputMetadataHolder = null;
+            }
+            return outputMetadataHolder;
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    private synchronized WriteResult updateRecords(final RecordSchema 
inputRecordSchema, final OutputMetadataHolder outputMetadataHolder,
+                                                   final RecordReader reader, 
final RecordSetWriter writer) throws IOException {
+        try {
+            writer.beginRecordSet();
+            Record inputRecord;
+            while ((inputRecord = reader.nextRecord()) != null) {
+                List<RecordField> inputRecordFields = 
inputRecordSchema.getFields();
+                Map<String, Object> outputRecordFields = new 
HashMap<>(inputRecordFields.size());
+                // Copy values from input field name to output field name
+                for (Map.Entry<String, String> mapping : 
outputMetadataHolder.getFieldMap().entrySet()) {
+                    outputRecordFields.put(mapping.getValue(), 
inputRecord.getValue(mapping.getKey()));
+                }
+                final Record outputRecord = new 
MapRecord(outputMetadataHolder.getOutputSchema(), outputRecordFields);
+                writer.write(outputRecord);
+            }
+            return writer.finishRecordSet();
+
+        } catch (MalformedRecordException mre) {
+            throw new IOException("Error reading records: " + 
mre.getMessage(), mre);
+        }
+    }
+
+    private String getJdbcUrl(final Connection connection) {
+        try {
+            final DatabaseMetaData databaseMetaData = connection.getMetaData();
+            if (databaseMetaData != null) {
+                return databaseMetaData.getURL();
+            }
+        } catch (final Exception e) {
+            getLogger().warn("Could not determine JDBC URL based on the Driver 
Connection.", e);
+        }
+
+        return "DBCPService";
+    }
+
+    private static class OutputMetadataHolder {
+        private final RecordSchema outputSchema;
+        private final Map<String, String> fieldMap;
+
+        public OutputMetadataHolder(final RecordSchema outputSchema, final 
Map<String, String> fieldMap) {
+            this.outputSchema = outputSchema;
+            this.fieldMap = fieldMap;
+        }
+
+        public RecordSchema getOutputSchema() {
+            return outputSchema;
+        }
+
+        public Map<String, String> getFieldMap() {
+            return fieldMap;
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/ColumnDescription.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/ColumnDescription.java
new file mode 100644
index 0000000000..f628331be5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/ColumnDescription.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ColumnDescription {
+    private final String columnName;
+    private final int dataType;
+    private final boolean required;
+    private final Integer columnSize;
+    private final boolean nullable;
+
+    public ColumnDescription(final String columnName, final int dataType, 
final boolean required, final Integer columnSize, final boolean nullable) {
+        this.columnName = columnName;
+        this.dataType = dataType;
+        this.required = required;
+        this.columnSize = columnSize;
+        this.nullable = nullable;
+    }
+
+    public int getDataType() {
+        return dataType;
+    }
+
+    public String getColumnName() {
+        return columnName;
+    }
+
+    public Integer getColumnSize() {
+        return columnSize;
+    }
+
+    public boolean isRequired() {
+        return required;
+    }
+
+    public boolean isNullable() {
+        return nullable;
+    }
+
+    public static ColumnDescription from(final ResultSet resultSet) throws 
SQLException {
+        final ResultSetMetaData md = resultSet.getMetaData();
+        List<String> columns = new ArrayList<>();
+
+        for (int i = 1; i < md.getColumnCount() + 1; i++) {
+            columns.add(md.getColumnName(i));
+        }
+        // COLUMN_DEF must be read first to work around Oracle bug, see 
NIFI-4279 for details
+        final String defaultValue = resultSet.getString("COLUMN_DEF");
+        final String columnName = resultSet.getString("COLUMN_NAME");
+        final int dataType = resultSet.getInt("DATA_TYPE");
+        final int colSize = resultSet.getInt("COLUMN_SIZE");
+
+        final String nullableValue = resultSet.getString("IS_NULLABLE");
+        final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || 
nullableValue.isEmpty();
+        String autoIncrementValue = "NO";
+
+        if (columns.contains("IS_AUTOINCREMENT")) {
+            autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT");
+        }
+
+        final boolean isAutoIncrement = 
"YES".equalsIgnoreCase(autoIncrementValue);
+        final boolean required = !isNullable && !isAutoIncrement && 
defaultValue == null;
+
+        return new ColumnDescription(columnName, dataType, required, colSize 
== 0 ? null : colSize, isNullable);
+    }
+
+    public static String normalizeColumnName(final String colName, final 
boolean translateColumnNames) {
+        return colName == null ? null : (translateColumnNames ? 
colName.toUpperCase().replace("_", "") : colName);
+    }
+
+    @Override
+    public String toString() {
+        return "Column[name=" + columnName + ", dataType=" + dataType + ", 
required=" + required + ", columnSize=" + columnSize + "]";
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
index 197131c59b..98803d66c7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
@@ -16,8 +16,12 @@
  */
 package org.apache.nifi.processors.standard.db;
 
+import java.sql.JDBCType;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Interface for RDBMS/JDBC-specific code.
@@ -91,11 +95,11 @@ public interface DatabaseAdapter {
      * <br /><br />
      * There is no standard way of doing this so not all adapters support it - 
use together with {@link #supportsUpsert()}!
      *
-     * @param table                     The name of the table in which to 
update/insert a record into.
-     * @param columnNames               The name of the columns in the table 
to add values to.
-     * @param uniqueKeyColumnNames      The name of the columns that form a 
unique key.
-     * @return                          A String containing the parameterized 
jdbc SQL statement.
-     *                                      The order and number of parameters 
are the same as that of the provided column list.
+     * @param table                The name of the table in which to 
update/insert a record into.
+     * @param columnNames          The name of the columns in the table to add 
values to.
+     * @param uniqueKeyColumnNames The name of the columns that form a unique 
key.
+     * @return A String containing the parameterized jdbc SQL statement.
+     * The order and number of parameters are the same as that of the provided 
column list.
      */
     default String getUpsertStatement(String table, List<String> columnNames, 
Collection<String> uniqueKeyColumnNames) {
         throw new UnsupportedOperationException("UPSERT is not supported for " 
+ getName());
@@ -122,6 +126,7 @@ public interface DatabaseAdapter {
      * <p>The default implementation of this method removes double quotes.
      * If the target database engine supports different escape characters, 
then its DatabaseAdapter implementation should override
      * this method so that such escape characters can be removed properly.</p>
+     *
      * @param identifier An identifier which may be wrapped with escape 
characters
      * @return An unwrapped identifier string, or null if the input identifier 
is null
      */
@@ -132,4 +137,83 @@ public interface DatabaseAdapter {
     default String getTableAliasClause(String tableName) {
         return "AS " + tableName;
     }
+
+    default String getTableQuoteString() {
+        // ANSI standard is a double quote
+        return "\"";
+    }
+
+    default String getColumnQuoteString() {
+        // ANSI standard is a double quote
+        return "\"";
+    }
+
+    default boolean supportsCreateTableIfNotExists() {
+        return false;
+    }
+
+    /**
+     * Generates a CREATE TABLE statement using the specified table schema
+     * @param tableSchema The table schema including column information
+     * @param quoteTableName Whether to quote the table name in the generated 
DDL
+     * @param quoteColumnNames Whether to quote column names in the generated 
DDL
+     * @return A String containing DDL to create the specified table
+     */
+    default String getCreateTableStatement(TableSchema tableSchema, boolean 
quoteTableName, boolean quoteColumnNames) {
+        StringBuilder createTableStatement = new StringBuilder();
+
+        List<ColumnDescription> columns = tableSchema.getColumnsAsList();
+        List<String> columnsAndDatatypes = new ArrayList<>(columns.size());
+        Set<String> primaryKeyColumnNames = 
tableSchema.getPrimaryKeyColumnNames();
+        for (ColumnDescription column : columns) {
+            StringBuilder sb = new StringBuilder()
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(column.getColumnName())
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(" ")
+                    .append(getSQLForDataType(column.getDataType()))
+                    .append(column.isNullable() ? "" : " NOT NULL")
+                    .append(primaryKeyColumnNames != null && 
primaryKeyColumnNames.contains(column.getColumnName()) ? " PRIMARY KEY" : "");
+            columnsAndDatatypes.add(sb.toString());
+        }
+
+        createTableStatement.append("CREATE TABLE IF NOT EXISTS ")
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(tableSchema.getTableName())
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(" (")
+                .append(String.join(", ", columnsAndDatatypes))
+                .append(") ");
+
+        return createTableStatement.toString();
+    }
+
+    default List<String> getAlterTableStatements(String tableName, 
List<ColumnDescription> columnsToAdd, final boolean quoteTableName, final 
boolean quoteColumnNames) {
+        StringBuilder createTableStatement = new StringBuilder();
+
+        List<String> columnsAndDatatypes = new 
ArrayList<>(columnsToAdd.size());
+        for (ColumnDescription column : columnsToAdd) {
+            StringBuilder sb = new StringBuilder()
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(column.getColumnName())
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(" ")
+                    .append(getSQLForDataType(column.getDataType()));
+            columnsAndDatatypes.add(sb.toString());
+        }
+
+        createTableStatement.append("ALTER TABLE ")
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(tableName)
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(" ADD COLUMNS (")
+                .append(String.join(", ", columnsAndDatatypes))
+                .append(") ");
+
+        return Collections.singletonList(createTableStatement.toString());
+    }
+
+    default String getSQLForDataType(int sqlType) {
+        return JDBCType.valueOf(sqlType).getName();
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableNotFoundException.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableNotFoundException.java
new file mode 100644
index 0000000000..c8d9804678
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableNotFoundException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db;
+
+import java.sql.SQLException;
+
+/**
+ * This is a marker class to distinguish a table not being found from other 
SQL exceptions
+ */
+public class TableNotFoundException extends SQLException {
+    public TableNotFoundException(String s) {
+        super(s);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
new file mode 100644
index 0000000000..70415dd934
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db;
+
+import org.apache.nifi.logging.ComponentLog;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TableSchema {
+    private final List<String> requiredColumnNames;
+    private final Set<String> primaryKeyColumnNames;
+    private final Map<String, ColumnDescription> columns;
+    private final String quotedIdentifierString;
+    private final String tableName;
+
+    public TableSchema(final String tableName, final List<ColumnDescription> 
columnDescriptions, final boolean translateColumnNames,
+                        final Set<String> primaryKeyColumnNames, final String 
quotedIdentifierString) {
+        this.tableName = tableName;
+        this.columns = new LinkedHashMap<>();
+        this.primaryKeyColumnNames = primaryKeyColumnNames;
+        this.quotedIdentifierString = quotedIdentifierString;
+
+        this.requiredColumnNames = new ArrayList<>();
+        for (final ColumnDescription desc : columnDescriptions) {
+            
columns.put(ColumnDescription.normalizeColumnName(desc.getColumnName(), 
translateColumnNames), desc);
+            if (desc.isRequired()) {
+                requiredColumnNames.add(desc.getColumnName());
+            }
+        }
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public Map<String, ColumnDescription> getColumns() {
+        return columns;
+    }
+
+    public List<ColumnDescription> getColumnsAsList() {
+        return new ArrayList<>(columns.values());
+    }
+
+    public List<String> getRequiredColumnNames() {
+        return requiredColumnNames;
+    }
+
+    public Set<String> getPrimaryKeyColumnNames() {
+        return primaryKeyColumnNames;
+    }
+
+    public String getQuotedIdentifierString() {
+        return quotedIdentifierString;
+    }
+
+    public static TableSchema from(final Connection conn, final String 
catalog, final String schema, final String tableName,
+                                   final boolean translateColumnNames, final 
String updateKeys, ComponentLog log) throws SQLException {
+        final DatabaseMetaData dmd = conn.getMetaData();
+
+        try (final ResultSet colrs = dmd.getColumns(catalog, schema, 
tableName, "%")) {
+            final List<ColumnDescription> cols = new ArrayList<>();
+            while (colrs.next()) {
+                final ColumnDescription col = ColumnDescription.from(colrs);
+                cols.add(col);
+            }
+            // If no columns are found, check that the table exists
+            if (cols.isEmpty()) {
+                try (final ResultSet tblrs = dmd.getTables(catalog, schema, 
tableName, null)) {
+                    List<String> qualifiedNameSegments = new ArrayList<>();
+                    if (catalog != null) {
+                        qualifiedNameSegments.add(catalog);
+                    }
+                    if (schema != null) {
+                        qualifiedNameSegments.add(schema);
+                    }
+                    if (tableName != null) {
+                        qualifiedNameSegments.add(tableName);
+                    }
+                    if (!tblrs.next()) {
+
+                        throw new TableNotFoundException("Table "
+                                + String.join(".", qualifiedNameSegments)
+                                + " not found, ensure the Catalog, Schema, 
and/or Table Names match those in the database exactly");
+                    } else {
+                        log.warn("Table "
+                                + String.join(".", qualifiedNameSegments)
+                                + " found but no columns were found, if this 
is not expected then check the user permissions for getting table metadata from 
the database");
+                    }
+                }
+            }
+
+            final Set<String> primaryKeyColumns = new HashSet<>();
+            if (updateKeys == null) {
+                try (final ResultSet pkrs = dmd.getPrimaryKeys(catalog, 
schema, tableName)) {
+
+                    while (pkrs.next()) {
+                        final String colName = pkrs.getString("COLUMN_NAME");
+                        
primaryKeyColumns.add(ColumnDescription.normalizeColumnName(colName, 
translateColumnNames));
+                    }
+                }
+            } else {
+                // Parse the Update Keys field and normalize the column names
+                for (final String updateKey : updateKeys.split(",")) {
+                    
primaryKeyColumns.add(ColumnDescription.normalizeColumnName(updateKey.trim(), 
translateColumnNames));
+                }
+            }
+
+            return new TableSchema(tableName, cols, translateColumnNames, 
primaryKeyColumns, dmd.getIdentifierQuoteString());
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "TableSchema[columns=" + columns.values() + "]";
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
index ec276fc651..60035b0e64 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
@@ -17,8 +17,13 @@
 package org.apache.nifi.processors.standard.db.impl;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.standard.db.ColumnDescription;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 /**
  * A database adapter that generates MS SQL Compatible SQL.
  */
@@ -104,4 +109,28 @@ public class MSSQLDatabaseAdapter implements 
DatabaseAdapter {
         // Remove double quotes and square brackets.
         return identifier == null ? null : identifier.replaceAll("[\"\\[\\]]", 
"");
     }
+
+    @Override
+    public List<String> getAlterTableStatements(final String tableName, final 
List<ColumnDescription> columnsToAdd, final boolean quoteTableName, final 
boolean quoteColumnNames) {
+        List<String> columnsAndDatatypes = new 
ArrayList<>(columnsToAdd.size());
+        for (ColumnDescription column : columnsToAdd) {
+            String dataType = getSQLForDataType(column.getDataType());
+            StringBuilder sb = new StringBuilder("ADD ")
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(column.getColumnName())
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(" ")
+                    .append(dataType);
+            columnsAndDatatypes.add(sb.toString());
+        }
+
+        StringBuilder alterTableStatement = new StringBuilder();
+        return Collections.singletonList(alterTableStatement.append("ALTER 
TABLE ")
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(tableName)
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(" ")
+                .append(String.join(", ", columnsAndDatatypes))
+                .toString());
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MySQLDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MySQLDatabaseAdapter.java
index 6ce8a15f9b..baec18c57a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MySQLDatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MySQLDatabaseAdapter.java
@@ -16,13 +16,28 @@
  */
 package org.apache.nifi.processors.standard.db.impl;
 
+import org.apache.nifi.processors.standard.db.ColumnDescription;
 import org.apache.nifi.util.StringUtils;
 
+import java.sql.JDBCType;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static java.sql.Types.CHAR;
+import static java.sql.Types.CLOB;
+import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NCHAR;
+import static java.sql.Types.NCLOB;
+import static java.sql.Types.NVARCHAR;
+import static java.sql.Types.OTHER;
+import static java.sql.Types.SQLXML;
+import static java.sql.Types.VARCHAR;
+
 /**
  * A generic database adapter that generates MySQL compatible SQL.
  */
@@ -125,4 +140,64 @@ public class MySQLDatabaseAdapter extends 
GenericDatabaseAdapter {
                 .append("(").append(parameterizedInsertValues).append(")");
         return statementStringBuilder.toString();
     }
+
+    @Override
+    public String getTableQuoteString() {
+        return "`";
+    }
+
+    @Override
+    public String getColumnQuoteString() {
+        return "`";
+    }
+
+    @Override
+    public boolean supportsCreateTableIfNotExists() {
+        return true;
+    }
+
+    @Override
+    public List<String> getAlterTableStatements(final String tableName, final 
List<ColumnDescription> columnsToAdd, final boolean quoteTableName, final 
boolean quoteColumnNames) {
+        List<String> columnsAndDatatypes = new 
ArrayList<>(columnsToAdd.size());
+        for (ColumnDescription column : columnsToAdd) {
+            String dataType = getSQLForDataType(column.getDataType());
+            StringBuilder sb = new StringBuilder("ADD COLUMN ")
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(column.getColumnName())
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(" ")
+                    .append(dataType);
+            columnsAndDatatypes.add(sb.toString());
+        }
+
+        StringBuilder alterTableStatement = new StringBuilder();
+        return Collections.singletonList(alterTableStatement.append("ALTER 
TABLE ")
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(tableName)
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(" ")
+                .append(String.join(", ", columnsAndDatatypes))
+                .toString());
+    }
+
+    @Override
+    public String getSQLForDataType(int sqlType) {
+        switch (sqlType) {
+            case Types.DOUBLE:
+                return "DOUBLE PRECISION";
+            case CHAR:
+            case LONGNVARCHAR:
+            case LONGVARCHAR:
+            case NCHAR:
+            case NVARCHAR:
+            case VARCHAR:
+            case CLOB:
+            case NCLOB:
+            case OTHER:
+            case SQLXML:
+                return "TEXT";
+            default:
+                return JDBCType.valueOf(sqlType).getName();
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
index 990831f6b6..5bd5556743 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
@@ -16,14 +16,29 @@
  */
 package org.apache.nifi.processors.standard.db.impl;
 
+import java.sql.JDBCType;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.standard.db.ColumnDescription;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
 
+import static java.sql.Types.CHAR;
+import static java.sql.Types.CLOB;
+import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NCHAR;
+import static java.sql.Types.NCLOB;
+import static java.sql.Types.NVARCHAR;
+import static java.sql.Types.OTHER;
+import static java.sql.Types.SQLXML;
+import static java.sql.Types.VARCHAR;
+
 public class Oracle12DatabaseAdapter implements DatabaseAdapter {
     @Override
     public String getName() {
@@ -189,4 +204,53 @@ public class Oracle12DatabaseAdapter implements 
DatabaseAdapter {
         return table + "." + columnName + " = " + newTableAlias + "." + 
columnName;
     }
 
+    @Override
+    public List<String> getAlterTableStatements(String tableName, 
List<ColumnDescription> columnsToAdd, final boolean quoteTableName, final 
boolean quoteColumnNames) {
+        StringBuilder createTableStatement = new StringBuilder();
+
+        List<String> columnsAndDatatypes = new 
ArrayList<>(columnsToAdd.size());
+        for (ColumnDescription column : columnsToAdd) {
+            String dataType = getSQLForDataType(column.getDataType());
+            StringBuilder sb = new StringBuilder()
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(column.getColumnName())
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(" ")
+                    .append(dataType);
+            columnsAndDatatypes.add(sb.toString());
+        }
+
+        createTableStatement.append("ALTER TABLE ")
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(tableName)
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(" ADD (")
+                .append(String.join(", ", columnsAndDatatypes))
+                .append(") ");
+
+        return Collections.singletonList(createTableStatement.toString());
+    }
+
+    @Override
+    public String getSQLForDataType(int sqlType) {
+        switch (sqlType) {
+            case Types.DOUBLE:
+                return "DOUBLE PRECISION";
+            case CHAR:
+            case LONGNVARCHAR:
+            case LONGVARCHAR:
+            case NCHAR:
+            case NVARCHAR:
+            case VARCHAR:
+            case CLOB:
+            case NCLOB:
+            case OTHER:
+            case SQLXML:
+                // Must have a max length specified (the Oracle docs say 
2000), and use VARCHAR2 instead of VARCHAR for consistent comparison semantics
+                return "VARCHAR2(2000)";
+            default:
+                return JDBCType.valueOf(sqlType).getName();
+        }
+    }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
index bfaac8bcbb..511ce78f1b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
@@ -17,8 +17,26 @@
 package org.apache.nifi.processors.standard.db.impl;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.standard.db.ColumnDescription;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
 
+import java.sql.JDBCType;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static java.sql.Types.CHAR;
+import static java.sql.Types.CLOB;
+import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NCHAR;
+import static java.sql.Types.NCLOB;
+import static java.sql.Types.NVARCHAR;
+import static java.sql.Types.OTHER;
+import static java.sql.Types.SQLXML;
+import static java.sql.Types.VARCHAR;
+
 /**
  * A DatabaseAdapter that generates Oracle-compliant SQL.
  */
@@ -107,4 +125,53 @@ public class OracleDatabaseAdapter implements 
DatabaseAdapter {
     public String getTableAliasClause(String tableName) {
         return tableName;
     }
+
+    @Override
+    public List<String> getAlterTableStatements(String tableName, 
List<ColumnDescription> columnsToAdd, final boolean quoteTableName, final 
boolean quoteColumnNames) {
+        StringBuilder createTableStatement = new StringBuilder();
+
+        List<String> columnsAndDatatypes = new 
ArrayList<>(columnsToAdd.size());
+        for (ColumnDescription column : columnsToAdd) {
+            String dataType = getSQLForDataType(column.getDataType());
+            StringBuilder sb = new StringBuilder()
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(column.getColumnName())
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(" ")
+                    .append(dataType);
+            columnsAndDatatypes.add(sb.toString());
+        }
+
+        createTableStatement.append("ALTER TABLE ")
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(tableName)
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(" ADD (")
+                .append(String.join(", ", columnsAndDatatypes))
+                .append(") ");
+
+        return Collections.singletonList(createTableStatement.toString());
+    }
+
+    @Override
+    public String getSQLForDataType(int sqlType) {
+        switch (sqlType) {
+            case Types.DOUBLE:
+                return "DOUBLE PRECISION";
+            case CHAR:
+            case LONGNVARCHAR:
+            case LONGVARCHAR:
+            case NCHAR:
+            case NVARCHAR:
+            case VARCHAR:
+            case CLOB:
+            case NCLOB:
+            case OTHER:
+            case SQLXML:
+                // Must have a max length specified (the Oracle docs say 
2000), and use VARCHAR2 instead of VARCHAR for consistent comparison semantics
+                return "VARCHAR2(2000)";
+            default:
+                return JDBCType.valueOf(sqlType).getName();
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
index a70d88a472..c19efe51ca 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
@@ -17,8 +17,26 @@
 package org.apache.nifi.processors.standard.db.impl;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.standard.db.ColumnDescription;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
 
+import java.sql.JDBCType;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static java.sql.Types.CHAR;
+import static java.sql.Types.CLOB;
+import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NCHAR;
+import static java.sql.Types.NCLOB;
+import static java.sql.Types.NVARCHAR;
+import static java.sql.Types.OTHER;
+import static java.sql.Types.SQLXML;
+import static java.sql.Types.VARCHAR;
+
 /**
  * A Apache Phoenix database adapter that generates ANSI SQL.
  */
@@ -87,4 +105,54 @@ public final class PhoenixDatabaseAdapter implements 
DatabaseAdapter {
         }
         return query.toString();
     }
+
+    @Override
+    public boolean supportsCreateTableIfNotExists() {
+        return true;
+    }
+
+    @Override
+    public List<String> getAlterTableStatements(final String tableName, final 
List<ColumnDescription> columnsToAdd, final boolean quoteTableName, final 
boolean quoteColumnNames) {
+        List<String> columnsAndDatatypes = new 
ArrayList<>(columnsToAdd.size());
+        for (ColumnDescription column : columnsToAdd) {
+            String dataType = getSQLForDataType(column.getDataType());
+            StringBuilder sb = new StringBuilder("ADD COLUMN ")
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(column.getColumnName())
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(" ")
+                    .append(dataType);
+            columnsAndDatatypes.add(sb.toString());
+        }
+
+        StringBuilder alterTableStatement = new StringBuilder();
+        return Collections.singletonList(alterTableStatement.append("ALTER 
TABLE ")
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(tableName)
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(" ")
+                .append(String.join(", ", columnsAndDatatypes))
+                .toString());
+    }
+
+    @Override
+    public String getSQLForDataType(int sqlType) {
+        switch (sqlType) {
+            case Types.DOUBLE:
+                return "DOUBLE PRECISION";
+            case CHAR:
+            case LONGNVARCHAR:
+            case LONGVARCHAR:
+            case NCHAR:
+            case NVARCHAR:
+            case VARCHAR:
+            case CLOB:
+            case NCLOB:
+            case OTHER:
+            case SQLXML:
+                return "TEXT";
+            default:
+                return JDBCType.valueOf(sqlType).getName();
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java
index 21c46c3fba..5e48818600 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java
@@ -16,12 +16,28 @@
  */
 package org.apache.nifi.processors.standard.db.impl;
 
+import org.apache.nifi.processors.standard.db.ColumnDescription;
 import org.apache.nifi.util.StringUtils;
 
+import java.sql.JDBCType;
+import java.sql.Types;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static java.sql.Types.CHAR;
+import static java.sql.Types.CLOB;
+import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NCHAR;
+import static java.sql.Types.NCLOB;
+import static java.sql.Types.NVARCHAR;
+import static java.sql.Types.OTHER;
+import static java.sql.Types.SQLXML;
+import static java.sql.Types.VARCHAR;
+
 public class PostgreSQLDatabaseAdapter extends GenericDatabaseAdapter {
     @Override
     public String getName() {
@@ -56,29 +72,29 @@ public class PostgreSQLDatabaseAdapter extends 
GenericDatabaseAdapter {
         }
 
         String columns = columnNames.stream()
-            .collect(Collectors.joining(", "));
+                .collect(Collectors.joining(", "));
 
         String parameterizedInsertValues = columnNames.stream()
-            .map(__ -> "?")
-            .collect(Collectors.joining(", "));
+                .map(__ -> "?")
+                .collect(Collectors.joining(", "));
 
         String updateValues = columnNames.stream()
-            .map(columnName -> "EXCLUDED." + columnName)
-            .collect(Collectors.joining(", "));
+                .map(columnName -> "EXCLUDED." + columnName)
+                .collect(Collectors.joining(", "));
 
         String conflictClause = "(" + 
uniqueKeyColumnNames.stream().collect(Collectors.joining(", ")) + ")";
 
         StringBuilder statementStringBuilder = new StringBuilder("INSERT INTO 
")
-            .append(table)
-            .append("(").append(columns).append(")")
-            .append(" VALUES ")
-            .append("(").append(parameterizedInsertValues).append(")")
-            .append(" ON CONFLICT ")
-            .append(conflictClause)
-            .append(" DO UPDATE SET ")
-            .append("(").append(columns).append(")")
-            .append(" = ")
-            .append("(").append(updateValues).append(")");
+                .append(table)
+                .append("(").append(columns).append(")")
+                .append(" VALUES ")
+                .append("(").append(parameterizedInsertValues).append(")")
+                .append(" ON CONFLICT ")
+                .append(conflictClause)
+                .append(" DO UPDATE SET ")
+                .append("(").append(columns).append(")")
+                .append(" = ")
+                .append("(").append(updateValues).append(")");
 
         return statementStringBuilder.toString();
     }
@@ -115,5 +131,53 @@ public class PostgreSQLDatabaseAdapter extends 
GenericDatabaseAdapter {
         return statementStringBuilder.toString();
     }
 
+    @Override
+    public boolean supportsCreateTableIfNotExists() {
+        return true;
+    }
+
+    @Override
+    public List<String> getAlterTableStatements(final String tableName, final 
List<ColumnDescription> columnsToAdd, final boolean quoteTableName, final 
boolean quoteColumnNames) {
+        List<String> columnsAndDatatypes = new 
ArrayList<>(columnsToAdd.size());
+        for (ColumnDescription column : columnsToAdd) {
+            String dataType = getSQLForDataType(column.getDataType());
+            StringBuilder sb = new StringBuilder("ADD COLUMN ")
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(column.getColumnName())
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(" ")
+                    .append(dataType);
+            columnsAndDatatypes.add(sb.toString());
+        }
 
+        StringBuilder alterTableStatement = new StringBuilder();
+        return Collections.singletonList(alterTableStatement.append("ALTER 
TABLE ")
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(tableName)
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(" ")
+                .append(String.join(", ", columnsAndDatatypes))
+                .toString());
+    }
+
+    @Override
+    public String getSQLForDataType(int sqlType) {
+        switch (sqlType) {
+            case Types.DOUBLE:
+                return "DOUBLE PRECISION";
+            case CHAR:
+            case LONGNVARCHAR:
+            case LONGVARCHAR:
+            case NCHAR:
+            case NVARCHAR:
+            case VARCHAR:
+            case CLOB:
+            case NCLOB:
+            case OTHER:
+            case SQLXML:
+                return "TEXT";
+            default:
+                return JDBCType.valueOf(sqlType).getName();
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 6934c803df..6a336d141a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -123,6 +123,7 @@ org.apache.nifi.processors.standard.TailFile
 org.apache.nifi.processors.standard.TransformXml
 org.apache.nifi.processors.standard.UnpackContent
 org.apache.nifi.processors.standard.UpdateCounter
+org.apache.nifi.processors.standard.UpdateDatabaseTable
 org.apache.nifi.processors.standard.UpdateRecord
 org.apache.nifi.processors.standard.ValidateCsv
 org.apache.nifi.processors.standard.ValidateJson
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index bc88542531..182e86eaf5 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -19,6 +19,8 @@ package org.apache.nifi.processors.standard;
 import org.apache.commons.dbcp2.DelegatingConnection;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import org.apache.nifi.processors.standard.db.ColumnDescription;
+import org.apache.nifi.processors.standard.db.TableSchema;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -215,14 +217,15 @@ public class PutDatabaseRecordTest {
                 new RecordField("non_existing", 
RecordFieldType.BOOLEAN.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        final PutDatabaseRecord.TableSchema tableSchema = new 
PutDatabaseRecord.TableSchema(
+        final TableSchema tableSchema = new TableSchema(
+                "PERSONS",
                 Arrays.asList(
-                        new PutDatabaseRecord.ColumnDescription("id", 4, true, 
2, false),
-                        new PutDatabaseRecord.ColumnDescription("name", 12, 
true, 255, true),
-                        new PutDatabaseRecord.ColumnDescription("code", 4, 
true, 10, true)
+                        new ColumnDescription("id", 4, true, 2, false),
+                        new ColumnDescription("name", 12, true, 255, true),
+                        new ColumnDescription("code", 4, true, 10, true)
                 ),
                 false,
-                new HashSet<String>(Arrays.asList("id")),
+                new HashSet<>(Arrays.asList("id")),
                 ""
         );
 
@@ -242,7 +245,7 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testGeneratePreparedStatementsFailUnmatchedField() throws 
SQLException, MalformedRecordException {
+    void testGeneratePreparedStatementsFailUnmatchedField() {
 
         final List<RecordField> fields = Arrays.asList(new RecordField("id", 
RecordFieldType.INT.getDataType()),
                 new RecordField("name", RecordFieldType.STRING.getDataType()),
@@ -250,14 +253,15 @@ public class PutDatabaseRecordTest {
                 new RecordField("non_existing", 
RecordFieldType.BOOLEAN.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        final PutDatabaseRecord.TableSchema tableSchema = new 
PutDatabaseRecord.TableSchema(
+        final TableSchema tableSchema = new TableSchema(
+                "PERSONS",
                 Arrays.asList(
-                        new PutDatabaseRecord.ColumnDescription("id", 4, true, 
2, false),
-                        new PutDatabaseRecord.ColumnDescription("name", 12, 
true, 255, true),
-                        new PutDatabaseRecord.ColumnDescription("code", 4, 
true, 10, true)
+                        new ColumnDescription("id", 4, true, 2, false),
+                        new ColumnDescription("name", 12, true, 255, true),
+                        new ColumnDescription("code", 4, true, 10, true)
                 ),
                 false,
-                new HashSet<String>(Arrays.asList("id")),
+                new HashSet<>(Arrays.asList("id")),
                 ""
         );
 
@@ -1364,11 +1368,12 @@ public class PutDatabaseRecordTest {
 
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        final PutDatabaseRecord.TableSchema tableSchema = new 
PutDatabaseRecord.TableSchema(
+        final TableSchema tableSchema = new TableSchema(
+                "PERSONS",
                 Arrays.asList(
-                        new PutDatabaseRecord.ColumnDescription("id", 4, true, 
2, false),
-                        new PutDatabaseRecord.ColumnDescription("name", 12, 
true, 255, true),
-                        new PutDatabaseRecord.ColumnDescription("code", 4, 
true, 10, true)
+                        new ColumnDescription("id", 4, true, 2, false),
+                        new ColumnDescription("name", 12, true, 255, true),
+                        new ColumnDescription("code", 4, true, 10, true)
                 ),
                 false,
                 new HashSet<>(Arrays.asList("id")),
@@ -1382,13 +1387,11 @@ public class PutDatabaseRecordTest {
         runner.setProperty(PutDatabaseRecord.QUOTE_TABLE_IDENTIFIER, "true");
         final PutDatabaseRecord.DMLSettings settings = new 
PutDatabaseRecord.DMLSettings(runner.getProcessContext());
 
-
-        assertEquals("test_catalog.test_schema.test_table",
-                processor.generateTableName(settings, "test_catalog", 
"test_schema", "test_table", tableSchema));
+        assertEquals("test_catalog.test_schema.test_table", 
processor.generateTableName(settings, "test_catalog", "test_schema", 
"test_table", tableSchema));
     }
 
     @Test
-    void testInsertMismatchedCompatibleDataTypes() throws 
InitializationException, ProcessException, SQLException, IOException {
+    void testInsertMismatchedCompatibleDataTypes() throws 
InitializationException, ProcessException, SQLException {
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
@@ -1468,11 +1471,7 @@ public class PutDatabaseRecordTest {
         parser.addSchemaField("dt", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.FLOAT.getDataType()).getFieldType());
 
         LocalDate testDate1 = LocalDate.of(2021, 1, 26);
-        BigInteger nifiDate1 = 
BigInteger.valueOf(testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
 // in UTC
-        Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
         LocalDate testDate2 = LocalDate.of(2021, 7, 26);
-        BigInteger nifiDate2 = 
BigInteger.valueOf(testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
 // in UTC
-        Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ
 
         parser.addRecord("1", "rec1", 101, Arrays.asList(1.0, 2.0));
         parser.addRecord("2", "rec2", 102, Arrays.asList(3.0, 4.0));
@@ -1499,7 +1498,7 @@ public class PutDatabaseRecordTest {
         final Statement stmt = conn.createStatement();
         try {
             stmt.execute("DROP TABLE TEMP");
-        } catch(final Exception e) {
+        } catch (final Exception e) {
             // Do nothing, table may not exist
         }
         stmt.execute("CREATE TABLE TEMP (id integer primary key, name long 
varchar)");
@@ -1536,7 +1535,7 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    void testInsertWithDifferentColumnOrdering() throws 
InitializationException, ProcessException, SQLException, IOException {
+    void testInsertWithDifferentColumnOrdering() throws 
InitializationException, ProcessException, SQLException {
         // Manually create and drop the tables and schemas
         final Connection conn = dbcp.getConnection();
         final Statement stmt = conn.createStatement();
@@ -1595,7 +1594,7 @@ public class PutDatabaseRecordTest {
         byte[] bytes = "BLOB".getBytes();
         Byte[] blobRecordValue = new Byte[bytes.length];
         for (int i = 0; i < bytes.length; i++) {
-            blobRecordValue[i] = Byte.valueOf(bytes[i]);
+            blobRecordValue[i] = bytes[i];
         }
 
         parser.addSchemaField("id", RecordFieldType.INT);
@@ -1857,12 +1856,10 @@ public class PutDatabaseRecordTest {
         return () -> spyStmt[0];
     }
 
-
-
     static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord {
         @Override
         SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String 
tableName, TableSchema tableSchema, DMLSettings settings) throws 
IllegalArgumentException {
-            return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES 
(?,?,?,?)", Arrays.asList(0,1,2,3));
+            return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES 
(?,?,?,?)", Arrays.asList(0, 1, 2, 3));
         }
     }
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java
new file mode 100644
index 0000000000..058c9f6555
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateDatabaseTable.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.db.impl.DerbyDatabaseAdapter;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestUpdateDatabaseTable {
+
+    private static final String createPersons = "CREATE TABLE \"persons\" 
(\"id\" integer primary key, \"name\" varchar(100), \"code\" integer)";
+
+    @TempDir
+    public static File tempDir;
+
+    private static String derbyErrorFile;
+
+    private TestRunner runner;
+    private UpdateDatabaseTable processor;
+    private static DBCPService service;
+
+    @BeforeAll
+    public static void setupClass() throws ProcessException {
+        derbyErrorFile = System.getProperty("derby.stream.error.file", "");
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+        final File dbDir = new File(tempDir, "db");
+        service = new MockDBCPService(dbDir.getAbsolutePath());
+    }
+
+    @AfterAll
+    public static void restoreDefaults() {
+        System.setProperty("derby.stream.error.file", derbyErrorFile);
+        final File dbDir = new File(tempDir, "db");
+        dbDir.deleteOnExit();
+        try {
+            DriverManager.getConnection("jdbc:derby:" + dbDir + 
";shutdown=true");
+        } catch (SQLException sqle) {
+            // Ignore, most likely the DB has already been shutdown
+        }
+    }
+
+    @BeforeEach
+    public void setup() {
+        processor = new UpdateDatabaseTable();
+
+        try (Statement s = service.getConnection().createStatement()) {
+            s.execute("DROP TABLE \"persons\"");
+        } catch (SQLException se) {
+            // Ignore, table probably doesn't exist
+        }
+    }
+
+    @Test
+    public void testCreateTable() throws Exception {
+        runner = TestRunners.newTestRunner(processor);
+        MockRecordParser readerFactory = new MockRecordParser();
+
+        readerFactory.addSchemaField(new RecordField("id", 
RecordFieldType.INT.getDataType(), false));
+        readerFactory.addSchemaField(new RecordField("name", 
RecordFieldType.STRING.getDataType(), true));
+        readerFactory.addSchemaField(new RecordField("code", 
RecordFieldType.INT.getDataType(), 0, true));
+        readerFactory.addSchemaField(new RecordField("newField", 
RecordFieldType.STRING.getDataType(), 0, true));
+        readerFactory.addRecord(1, "name1", 10);
+
+        runner.addControllerService("mock-reader-factory", readerFactory);
+        runner.enableControllerService(readerFactory);
+
+        runner.setProperty(UpdateDatabaseTable.RECORD_READER, 
"mock-reader-factory");
+        runner.setProperty(UpdateDatabaseTable.TABLE_NAME, "${table.name}");
+        runner.setProperty(UpdateDatabaseTable.CREATE_TABLE, 
UpdateDatabaseTable.CREATE_IF_NOT_EXISTS);
+        runner.setProperty(UpdateDatabaseTable.QUOTE_TABLE_IDENTIFIER, 
"false");
+        runner.setProperty(UpdateDatabaseTable.QUOTE_COLUMN_IDENTIFIERS, 
"true");
+        runner.setProperty(UpdateDatabaseTable.DB_TYPE, new 
DerbyDatabaseAdapter().getName());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp");
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put("db.name", "default");
+        attrs.put("table.name", "newTable");
+        runner.enqueue(new byte[0], attrs);
+        runner.run();
+
+        runner.assertTransferCount(UpdateDatabaseTable.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(UpdateDatabaseTable.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(UpdateDatabaseTable.ATTR_OUTPUT_TABLE, 
"newTable");
+        // Verify the table has been created with the expected fields
+        try (Statement s = service.getConnection().createStatement()) {
+            // The Derby equivalent of DESCRIBE TABLE (using a query rather 
than the ij tool)
+            ResultSet rs = s.executeQuery("select * from sys.syscolumns where 
referenceid = (select tableid from sys.systables where tablename = 'NEWTABLE') 
order by columnnumber");
+            assertTrue(rs.next());
+            // Columns 2,3,4 are Column Name, Column Index, and Column Type
+            assertEquals("id", rs.getString(2));
+            assertEquals(1, rs.getInt(3));
+            assertEquals("INTEGER NOT NULL", rs.getString(4));
+
+            assertTrue(rs.next());
+            assertEquals("name", rs.getString(2));
+            assertEquals(2, rs.getInt(3));
+            assertEquals("VARCHAR(100)", rs.getString(4));
+
+            assertTrue(rs.next());
+            assertEquals("code", rs.getString(2));
+            assertEquals(3, rs.getInt(3));
+            assertEquals("INTEGER", rs.getString(4));
+
+            assertTrue(rs.next());
+            assertEquals("newField", rs.getString(2));
+            assertEquals(4, rs.getInt(3));
+            assertEquals("VARCHAR(100)", rs.getString(4));
+
+            // No more rows
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testAddColumnToExistingTable() throws Exception {
+        runner = TestRunners.newTestRunner(processor);
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+
+            MockRecordParser readerFactory = new MockRecordParser();
+
+            readerFactory.addSchemaField(new RecordField("id", 
RecordFieldType.INT.getDataType(), false));
+            readerFactory.addSchemaField(new RecordField("name", 
RecordFieldType.STRING.getDataType(), true));
+            readerFactory.addSchemaField(new RecordField("code", 
RecordFieldType.INT.getDataType(), 0, true));
+            readerFactory.addSchemaField(new RecordField("newField", 
RecordFieldType.STRING.getDataType(), 0, true));
+            readerFactory.addRecord(1, "name1", null, "test");
+
+            runner.addControllerService("mock-reader-factory", readerFactory);
+            runner.enableControllerService(readerFactory);
+
+            runner.setProperty(UpdateDatabaseTable.RECORD_READER, 
"mock-reader-factory");
+            runner.setProperty(UpdateDatabaseTable.TABLE_NAME, 
"${table.name}");
+            runner.setProperty(UpdateDatabaseTable.CREATE_TABLE, 
UpdateDatabaseTable.FAIL_IF_NOT_EXISTS);
+            runner.setProperty(UpdateDatabaseTable.QUOTE_TABLE_IDENTIFIER, 
"true");
+            runner.setProperty(UpdateDatabaseTable.QUOTE_COLUMN_IDENTIFIERS, 
"false");
+            runner.setProperty(UpdateDatabaseTable.DB_TYPE, new 
DerbyDatabaseAdapter().getName());
+            runner.addControllerService("dbcp", service);
+            runner.enableControllerService(service);
+            runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp");
+            Map<String, String> attrs = new HashMap<>();
+            attrs.put("db.name", "default");
+            attrs.put("table.name", "persons");
+            runner.enqueue(new byte[0], attrs);
+            runner.run();
+
+            runner.assertTransferCount(UpdateDatabaseTable.REL_SUCCESS, 1);
+            final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(UpdateDatabaseTable.REL_SUCCESS).get(0);
+            
flowFile.assertAttributeEquals(UpdateDatabaseTable.ATTR_OUTPUT_TABLE, 
"persons");
+            // Verify the table has been updated with the expected field(s)
+            try (Statement s = conn.createStatement()) {
+                // The Derby equivalent of DESCRIBE TABLE (using a query 
rather than the ij tool)
+                ResultSet rs = s.executeQuery("SELECT * FROM SYS.SYSCOLUMNS 
WHERE referenceid = (SELECT tableid FROM SYS.SYSTABLES WHERE tablename = 
'persons') ORDER BY columnnumber");
+                assertTrue(rs.next());
+                // Columns 2,3,4 are Column Name, Column Index, and Column Type
+                assertEquals("id", rs.getString(2));
+                assertEquals(1, rs.getInt(3));
+                // Primary key cannot be null, Derby stores that in this column
+                assertEquals("INTEGER NOT NULL", rs.getString(4));
+
+                assertTrue(rs.next());
+                assertEquals("name", rs.getString(2));
+                assertEquals(2, rs.getInt(3));
+                assertEquals("VARCHAR(100)", rs.getString(4));
+
+                assertTrue(rs.next());
+                assertEquals("code", rs.getString(2));
+                assertEquals(3, rs.getInt(3));
+                assertEquals("INTEGER", rs.getString(4));
+
+                assertTrue(rs.next());
+                assertEquals("NEWFIELD", rs.getString(2));
+                assertEquals(4, rs.getInt(3));
+                assertEquals("VARCHAR(100)", rs.getString(4));
+
+                // No more rows
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
+    public void testAddExistingColumnTranslateFieldNames() throws Exception {
+        runner = TestRunners.newTestRunner(processor);
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+
+            MockRecordParser readerFactory = new MockRecordParser();
+
+            readerFactory.addSchemaField(new RecordField("ID", 
RecordFieldType.INT.getDataType(), false));
+            readerFactory.addSchemaField(new RecordField("NAME", 
RecordFieldType.STRING.getDataType(), true));
+            readerFactory.addSchemaField(new RecordField("CODE", 
RecordFieldType.INT.getDataType(), 0, true));
+            readerFactory.addRecord(1, "name1", null, "test");
+
+            runner.addControllerService("mock-reader-factory", readerFactory);
+            runner.enableControllerService(readerFactory);
+
+            runner.setProperty(UpdateDatabaseTable.RECORD_READER, 
"mock-reader-factory");
+            runner.setProperty(UpdateDatabaseTable.TABLE_NAME, 
"${table.name}");
+            runner.setProperty(UpdateDatabaseTable.CREATE_TABLE, 
UpdateDatabaseTable.FAIL_IF_NOT_EXISTS);
+            runner.setProperty(UpdateDatabaseTable.TRANSLATE_FIELD_NAMES, 
"true");
+            runner.setProperty(UpdateDatabaseTable.QUOTE_TABLE_IDENTIFIER, 
"true");
+            runner.setProperty(UpdateDatabaseTable.QUOTE_COLUMN_IDENTIFIERS, 
"false");
+            runner.setProperty(UpdateDatabaseTable.DB_TYPE, new 
DerbyDatabaseAdapter().getName());
+            runner.addControllerService("dbcp", service);
+            runner.enableControllerService(service);
+            runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp");
+            Map<String, String> attrs = new HashMap<>();
+            attrs.put("db.name", "default");
+            attrs.put("table.name", "persons");
+            runner.enqueue(new byte[0], attrs);
+            runner.run();
+
+            runner.assertTransferCount(UpdateDatabaseTable.REL_SUCCESS, 1);
+            final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(UpdateDatabaseTable.REL_SUCCESS).get(0);
+            
flowFile.assertAttributeEquals(UpdateDatabaseTable.ATTR_OUTPUT_TABLE, 
"persons");
+            // Verify the table has been updated with the expected field(s)
+            try (Statement s = conn.createStatement()) {
+                // The Derby equivalent of DESCRIBE TABLE (using a query 
rather than the ij tool)
+                ResultSet rs = s.executeQuery("SELECT * FROM SYS.SYSCOLUMNS 
WHERE referenceid = (SELECT tableid FROM SYS.SYSTABLES WHERE tablename = 
'persons') ORDER BY columnnumber");
+                assertTrue(rs.next());
+                // Columns 2,3,4 are Column Name, Column Index, and Column Type
+                assertEquals("id", rs.getString(2));
+                assertEquals(1, rs.getInt(3));
+                // Primary key cannot be null, Derby stores that in this column
+                assertEquals("INTEGER NOT NULL", rs.getString(4));
+
+                assertTrue(rs.next());
+                assertEquals("name", rs.getString(2));
+                assertEquals(2, rs.getInt(3));
+                assertEquals("VARCHAR(100)", rs.getString(4));
+
+                assertTrue(rs.next());
+                assertEquals("code", rs.getString(2));
+                assertEquals(3, rs.getInt(3));
+                assertEquals("INTEGER", rs.getString(4));
+
+                // No more rows
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
+    public void testAddExistingColumnNoTranslateFieldNames() throws Exception {
+        runner = TestRunners.newTestRunner(processor);
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+                stmt.execute("ALTER TABLE \"persons\" ADD COLUMN \"ID\" 
INTEGER");
+            }
+
+            MockRecordParser readerFactory = new MockRecordParser();
+
+            readerFactory.addSchemaField(new RecordField("ID", 
RecordFieldType.INT.getDataType(), false));
+            readerFactory.addSchemaField(new RecordField("NAME", 
RecordFieldType.STRING.getDataType(), true));
+            readerFactory.addSchemaField(new RecordField("code", 
RecordFieldType.INT.getDataType(), 0, true));
+            readerFactory.addRecord(1, "name1", null, "test");
+
+            runner.addControllerService("mock-reader-factory", readerFactory);
+            runner.enableControllerService(readerFactory);
+
+            runner.setProperty(UpdateDatabaseTable.RECORD_READER, 
"mock-reader-factory");
+            runner.setProperty(UpdateDatabaseTable.TABLE_NAME, 
"${table.name}");
+            runner.setProperty(UpdateDatabaseTable.CREATE_TABLE, 
UpdateDatabaseTable.FAIL_IF_NOT_EXISTS);
+            runner.setProperty(UpdateDatabaseTable.TRANSLATE_FIELD_NAMES, 
"false");
+            runner.setProperty(UpdateDatabaseTable.QUOTE_TABLE_IDENTIFIER, 
"true");
+            runner.setProperty(UpdateDatabaseTable.QUOTE_COLUMN_IDENTIFIERS, 
"false");
+            runner.setProperty(UpdateDatabaseTable.DB_TYPE, new 
DerbyDatabaseAdapter().getName());
+            runner.addControllerService("dbcp", service);
+            runner.enableControllerService(service);
+            runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp");
+            Map<String, String> attrs = new HashMap<>();
+            attrs.put("db.name", "default");
+            attrs.put("table.name", "persons");
+            runner.enqueue(new byte[0], attrs);
+            runner.run();
+
+            runner.assertTransferCount(UpdateDatabaseTable.REL_SUCCESS, 1);
+            final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(UpdateDatabaseTable.REL_SUCCESS).get(0);
+            
flowFile.assertAttributeEquals(UpdateDatabaseTable.ATTR_OUTPUT_TABLE, 
"persons");
+            // Verify the table has been updated with the expected field(s)
+            try (Statement s = conn.createStatement()) {
+                // The Derby equivalent of DESCRIBE TABLE (using a query 
rather than the ij tool)
+                ResultSet rs = s.executeQuery("SELECT * FROM SYS.SYSCOLUMNS 
WHERE referenceid = (SELECT tableid FROM SYS.SYSTABLES WHERE tablename = 
'persons') ORDER BY columnnumber");
+                assertTrue(rs.next());
+                // Columns 2,3,4 are Column Name, Column Index, and Column Type
+                assertEquals("id", rs.getString(2));
+                assertEquals(1, rs.getInt(3));
+                // Primary key cannot be null, Derby stores that in this column
+                assertEquals("INTEGER NOT NULL", rs.getString(4));
+
+                assertTrue(rs.next());
+                assertEquals("name", rs.getString(2));
+                assertEquals(2, rs.getInt(3));
+                assertEquals("VARCHAR(100)", rs.getString(4));
+
+                assertTrue(rs.next());
+                assertEquals("code", rs.getString(2));
+                assertEquals(3, rs.getInt(3));
+                assertEquals("INTEGER", rs.getString(4));
+
+                assertTrue(rs.next());
+                assertEquals("ID", rs.getString(2));
+                assertEquals(4, rs.getInt(3));
+                assertEquals("INTEGER", rs.getString(4));
+
+                assertTrue(rs.next());
+                assertEquals("NAME", rs.getString(2));
+                assertEquals(5, rs.getInt(3));
+                assertEquals("VARCHAR(100)", rs.getString(4));
+
+                // No more rows
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    /**
+     * Simple implementation only for testing purposes
+     */
+    private static class MockDBCPService extends AbstractControllerService 
implements DBCPService {
+        private final String dbLocation;
+
+        public MockDBCPService(final String dbLocation) {
+            this.dbLocation = dbLocation;
+        }
+
+        @Override
+        public String getIdentifier() {
+            return "dbcp";
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                return DriverManager.getConnection("jdbc:derby:" + dbLocation 
+ ";create=true");
+            } catch (final Exception e) {
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
index 579e39ed2b..cea3ee9fe9 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
@@ -17,7 +17,25 @@
 package org.apache.nifi.processors.standard.db.impl;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.standard.db.ColumnDescription;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.TableSchema;
+
+import java.sql.JDBCType;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import static java.sql.Types.CHAR;
+import static java.sql.Types.CLOB;
+import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NCHAR;
+import static java.sql.Types.NCLOB;
+import static java.sql.Types.NVARCHAR;
+import static java.sql.Types.OTHER;
+import static java.sql.Types.SQLXML;
+import static java.sql.Types.VARCHAR;
 
 /**
  * An implementation of DatabaseAdapter for Derby (used for testing).
@@ -91,4 +109,89 @@ public class DerbyDatabaseAdapter implements 
DatabaseAdapter {
 
         return query.toString();
     }
+
+    @Override
+    public boolean supportsCreateTableIfNotExists() {
+        // This is not actually true, but it returns true so we can use the 
workaround for testing. "Real" adapters should report this accurately
+        return true;
+    }
+
+    @Override
+    public String getCreateTableStatement(final TableSchema tableSchema, final 
boolean quoteTableName, final boolean quoteColumnNames) {
+        StringBuilder createTableStatement = new StringBuilder();
+
+        List<ColumnDescription> columns = tableSchema.getColumnsAsList();
+        List<String> columnsAndDatatypes = new ArrayList<>(columns.size());
+        Set<String> primaryKeyColumnNames = 
tableSchema.getPrimaryKeyColumnNames();
+        for (ColumnDescription column : columns) {
+            StringBuilder sb = new StringBuilder()
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(column.getColumnName())
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(" ")
+                    .append(getSQLForDataType(column.getDataType()))
+                    .append(column.isNullable() ? "" : " NOT NULL")
+                    .append(primaryKeyColumnNames != null && 
primaryKeyColumnNames.contains(column.getColumnName()) ? " PRIMARY KEY" : "");
+            columnsAndDatatypes.add(sb.toString());
+        }
+
+        // This will throw an exception if the table already exists, but it 
should only be used for tests
+        createTableStatement.append("CREATE TABLE ")
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(tableSchema.getTableName())
+                .append(quoteTableName ? getTableQuoteString() : "")
+                .append(" (")
+                .append(String.join(", ", columnsAndDatatypes))
+                .append(") ");
+
+        return createTableStatement.toString();
+    }
+
+    @Override
+    public List<String> getAlterTableStatements(final String tableName, final 
List<ColumnDescription> columnsToAdd, final boolean quoteTableName, final 
boolean quoteColumnNames) {
+        List<String> alterTableStatements = new ArrayList<>();
+
+        List<String> columnsAndDatatypes = new 
ArrayList<>(columnsToAdd.size());
+        for (ColumnDescription column : columnsToAdd) {
+            String dataType = getSQLForDataType(column.getDataType());
+            StringBuilder sb = new StringBuilder()
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(column.getColumnName())
+                    .append(quoteColumnNames ? getColumnQuoteString() : "")
+                    .append(" ")
+                    .append(dataType);
+            columnsAndDatatypes.add(sb.toString());
+        }
+
+        for (String columnAndDataType : columnsAndDatatypes) {
+            StringBuilder createTableStatement = new StringBuilder();
+            alterTableStatements.add(createTableStatement.append("ALTER TABLE 
")
+                    .append(quoteTableName ? getTableQuoteString() : "")
+                    .append(tableName)
+                    .append(quoteTableName ? getTableQuoteString() : "")
+                    .append(" ADD COLUMN ")
+                    .append(columnAndDataType).toString());
+        }
+        return alterTableStatements;
+    }
+
+    @Override
+    public String getSQLForDataType(int sqlType) {
+        switch (sqlType) {
+            case CHAR:
+            case LONGNVARCHAR:
+            case LONGVARCHAR:
+            case NCHAR:
+            case NVARCHAR:
+            case VARCHAR:
+            case CLOB:
+            case NCLOB:
+            case OTHER:
+            case SQLXML:
+                // Derby must have a max length specified
+                return "VARCHAR(100)";
+            default:
+                return JDBCType.valueOf(sqlType).getName();
+        }
+    }
 }
\ No newline at end of file


Reply via email to