Repository: nifi
Updated Branches:
  refs/heads/master 18ad34810 -> 2007c207a


NIFI-4393: Handle database specific identifier escape characters

QueryDatabaseTable and GenerateTableFetch processors were not able to
use max value state as expected, if max value column was wrapped with
escape characters. Due to a mis-match between computed state keys
and actual keys used in the managed state. State keys computed by
getStateKey method included escape characters while actual stored keys
did not. Resulted querying the same dataset again and again.

This commit added unwrapIdentifier method to DatabaseAdapter class to
remove database specific escape characters for identifiers such as table
and column names, so that max value state keys are populated correctly
even if identifiers are wrapped with escape characters.

This commit also added new DatabaseAdapter for MySQL, to handle MySQL
specific identifier escape with back-ticks.

This closes #2424

Signed-off-by: Mike Thomsen <mikerthom...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2007c207
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2007c207
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2007c207

Branch: refs/heads/master
Commit: 2007c207ab1fac0c1d030cf29623796997650b20
Parents: 18ad348
Author: Koji Kawamura <ijokaruma...@apache.org>
Authored: Tue Jan 23 15:15:36 2018 +0900
Committer: Mike Thomsen <mikerthom...@gmail.com>
Committed: Thu May 3 09:06:05 2018 -0400

----------------------------------------------------------------------
 .../AbstractDatabaseFetchProcessor.java         | 19 ++++++----
 .../processors/standard/GenerateTableFetch.java | 28 +++++++--------
 .../processors/standard/QueryDatabaseTable.java |  6 ++--
 .../processors/standard/db/DatabaseAdapter.java | 13 +++++++
 .../db/impl/MSSQL2008DatabaseAdapter.java       |  3 +-
 .../standard/db/impl/MSSQLDatabaseAdapter.java  |  6 ++++
 .../standard/db/impl/MySQLDatabaseAdapter.java  | 38 ++++++++++++++++++++
 ....nifi.processors.standard.db.DatabaseAdapter |  3 +-
 .../standard/QueryDatabaseTableTest.java        | 25 +++++++++++--
 9 files changed, 113 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2007c207/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index c7bad42..ef8dd0a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -289,13 +289,13 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
                     final List<String> maxValueQualifiedColumnNameList = new 
ArrayList<>();
 
                     for (String maxValueColumn:maxValueColumnNameList) {
-                        String colKey = getStateKey(tableName, 
maxValueColumn.trim());
+                        String colKey = getStateKey(tableName, 
maxValueColumn.trim(), dbAdapter);
                         maxValueQualifiedColumnNameList.add(colKey);
                     }
 
                     for (int i = 1; i <= numCols; i++) {
                         String colName = 
resultSetMetaData.getColumnName(i).toLowerCase();
-                        String colKey = getStateKey(tableName, colName);
+                        String colKey = getStateKey(tableName, colName, 
dbAdapter);
 
                         //only include columns that are part of the maximum 
value tracking column list
                         if (!maxValueQualifiedColumnNameList.contains(colKey)) 
{
@@ -307,7 +307,7 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
                     }
 
                     for (String maxValueColumn:maxValueColumnNameList) {
-                        String colKey = getStateKey(tableName, 
maxValueColumn.trim().toLowerCase());
+                        String colKey = getStateKey(tableName, 
maxValueColumn.trim().toLowerCase(), dbAdapter);
                         if (!columnTypeMap.containsKey(colKey)) {
                             throw new ProcessException("Column not found in 
the table/query specified: " + maxValueColumn);
                         }
@@ -506,14 +506,21 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
         }
     }
 
-    protected static String getStateKey(String prefix, String columnName) {
+    /**
+     * Construct a key string for a corresponding state value.
+     * @param prefix A prefix may contain database and table name, or just 
table name, this can be null
+     * @param columnName A column name
+     * @param adapter DatabaseAdapter is used to unwrap identifiers
+     * @return a state key string
+     */
+    protected static String getStateKey(String prefix, String columnName, 
DatabaseAdapter adapter) {
         StringBuilder sb = new StringBuilder();
         if (prefix != null) {
-            sb.append(prefix.toLowerCase());
+            sb.append(adapter.unwrapIdentifier(prefix.toLowerCase()));
             sb.append(NAMESPACE_DELIMITER);
         }
         if (columnName != null) {
-            sb.append(columnName.toLowerCase());
+            sb.append(adapter.unwrapIdentifier(columnName.toLowerCase()));
         }
         return sb.toString();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/2007c207/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index e041bed..52a815d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -218,7 +218,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
             // If an initial max value for column(s) has been specified using 
properties, and this column is not in the state manager, sync them to the state 
property map
             for (final Map.Entry<String, String> maxProp : 
maxValueProperties.entrySet()) {
                 String maxPropKey = maxProp.getKey().toLowerCase();
-                String fullyQualifiedMaxPropKey = getStateKey(tableName, 
maxPropKey);
+                String fullyQualifiedMaxPropKey = getStateKey(tableName, 
maxPropKey, dbAdapter);
                 if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
                     String newMaxPropValue;
                     // If we can't find the value at the fully-qualified key 
name, it is possible (under a previous scheme)
@@ -252,13 +252,13 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
                 String colName = maxValueColumnNameList.get(index);
 
                 maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
-                String maxValue = getColumnStateMaxValue(tableName, 
statePropertyMap, colName);
+                String maxValue = getColumnStateMaxValue(tableName, 
statePropertyMap, colName, dbAdapter);
                 if (!StringUtils.isEmpty(maxValue)) {
-                    if(columnTypeMap.isEmpty() || getColumnType(tableName, 
colName) == null){
+                    if(columnTypeMap.isEmpty() || getColumnType(tableName, 
colName, dbAdapter) == null){
                         // This means column type cache is clean after 
instance reboot. We should re-cache column type
                         super.setup(context, false, finalFileToProcess);
                     }
-                    Integer type = getColumnType(tableName, colName);
+                    Integer type = getColumnType(tableName, colName, 
dbAdapter);
 
                     // Add a condition for the WHERE clause
                     maxValueClauses.add(colName + (index == 0 ? " > " : " >= 
") + getLiteralByType(type, maxValue, dbAdapter.getName()));
@@ -299,7 +299,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
                         // Since this column has been aliased lets check the 
label first,
                         // if there is no label we'll use the column name.
                         String resultColumnName = 
(StringUtils.isNotEmpty(rsmd.getColumnLabel(i)) ? rsmd.getColumnLabel(i) : 
rsmd.getColumnName(i)).toLowerCase();
-                        String fullyQualifiedStateKey = getStateKey(tableName, 
resultColumnName);
+                        String fullyQualifiedStateKey = getStateKey(tableName, 
resultColumnName, dbAdapter);
                         String resultColumnCurrentMax = 
statePropertyMap.get(fullyQualifiedStateKey);
                         if (StringUtils.isEmpty(resultColumnCurrentMax) && 
!isDynamicTableName) {
                             // If we can't find the value at the 
fully-qualified key name and the table name is static, it is possible (under a 
previous scheme)
@@ -334,13 +334,13 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
                     String colName = maxValueColumnNameList.get(index);
 
                     maxValueSelectColumns.add("MAX(" + colName + ") " + 
colName);
-                    String maxValue = getColumnStateMaxValue(tableName, 
statePropertyMap, colName);
+                    String maxValue = getColumnStateMaxValue(tableName, 
statePropertyMap, colName, dbAdapter);
                     if (!StringUtils.isEmpty(maxValue)) {
-                        if(columnTypeMap.isEmpty() || getColumnType(tableName, 
colName) == null){
+                        if(columnTypeMap.isEmpty() || getColumnType(tableName, 
colName, dbAdapter) == null){
                             // This means column type cache is clean after 
instance reboot. We should re-cache column type
                             super.setup(context, false, finalFileToProcess);
                         }
-                        Integer type = getColumnType(tableName, colName);
+                        Integer type = getColumnType(tableName, colName, 
dbAdapter);
 
                         // Add a condition for the WHERE clause
                         maxValueClauses.add(colName + " <= " + 
getLiteralByType(type, maxValue, dbAdapter.getName()));
@@ -410,23 +410,23 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
         }
     }
 
-    private String getColumnStateMaxValue(String tableName, Map<String, 
String> statePropertyMap, String colName) {
-        final String fullyQualifiedStateKey = getStateKey(tableName, colName);
+    private String getColumnStateMaxValue(String tableName, Map<String, 
String> statePropertyMap, String colName, DatabaseAdapter adapter) {
+        final String fullyQualifiedStateKey = getStateKey(tableName, colName, 
adapter);
         String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
         if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) {
             // If the table name is static and the fully-qualified key was not 
found, try just the column name
-            maxValue = statePropertyMap.get(getStateKey(null, colName));
+            maxValue = statePropertyMap.get(getStateKey(null, colName, 
adapter));
         }
 
         return maxValue;
     }
 
-    private Integer getColumnType(String tableName, String colName) {
-        final String fullyQualifiedStateKey = getStateKey(tableName, colName);
+    private Integer getColumnType(String tableName, String colName, 
DatabaseAdapter adapter) {
+        final String fullyQualifiedStateKey = getStateKey(tableName, colName, 
adapter);
         Integer type = columnTypeMap.get(fullyQualifiedStateKey);
         if (type == null && !isDynamicTableName) {
             // If the table name is static and the fully-qualified key was not 
found, try just the column name
-            type = columnTypeMap.get(getStateKey(null, colName));
+            type = columnTypeMap.get(getStateKey(null, colName, adapter));
         }
 
         return type;

http://git-wip-us.apache.org/repos/asf/nifi/blob/2007c207/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 254c9cb..17e47f5 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -263,7 +263,7 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
         //If an initial max value for column(s) has been specified using 
properties, and this column is not in the state manager, sync them to the state 
property map
         for (final Map.Entry<String, String> maxProp : 
maxValueProperties.entrySet()) {
             String maxPropKey = maxProp.getKey().toLowerCase();
-            String fullyQualifiedMaxPropKey = getStateKey(tableName, 
maxPropKey);
+            String fullyQualifiedMaxPropKey = getStateKey(tableName, 
maxPropKey, dbAdapter);
             if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
                 String newMaxPropValue;
                 // If we can't find the value at the fully-qualified key name, 
it is possible (under a previous scheme)
@@ -433,7 +433,7 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
         if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != 
null) {
             IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
                 String colName = maxValColumnNames.get(index);
-                String maxValueKey = getStateKey(tableName, colName);
+                String maxValueKey = getStateKey(tableName, colName, 
dbAdapter);
                 String maxValue = stateMap.get(maxValueKey);
                 if (StringUtils.isEmpty(maxValue)) {
                     // If we can't find the value at the fully-qualified key 
name, it is possible (under a previous scheme)
@@ -488,7 +488,7 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
                 if (nrOfColumns > 0) {
                     for (int i = 1; i <= nrOfColumns; i++) {
                         String colName = meta.getColumnName(i).toLowerCase();
-                        String fullyQualifiedMaxValueKey = 
getStateKey(tableName, colName);
+                        String fullyQualifiedMaxValueKey = 
getStateKey(tableName, colName, dbAdapter);
                         Integer type = 
columnTypeMap.get(fullyQualifiedMaxValueKey);
                         // Skip any columns we're not keeping track of or 
whose value is null
                         if (type == null || resultSet.getObject(i) == null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/2007c207/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
index 1d4f64a..4e1ad47 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
@@ -37,4 +37,17 @@ public interface DatabaseAdapter {
      * @return A String containing a SQL SELECT statement with the given 
clauses applied
      */
     String getSelectStatement(String tableName, String columnNames, String 
whereClause, String orderByClause, Long limit, Long offset);
+
+    /**
+     * <p>Returns a bare identifier string by removing wrapping escape 
characters
+     * from identifier strings such as table and column names.</p>
+     * <p>The default implementation of this method removes double quotes.
+     * If the target database engine supports different escape characters, 
then its DatabaseAdapter implementation should override
+     * this method so that such escape characters can be removed properly.</p>
+     * @param identifier An identifier which may be wrapped with escape 
characters
+     * @return An unwrapped identifier string, or null if the input identifier 
is null
+     */
+    default String unwrapIdentifier(String identifier) {
+        return identifier == null ? null : identifier.replaceAll("\"", "");
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/2007c207/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQL2008DatabaseAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQL2008DatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQL2008DatabaseAdapter.java
index 719f357..3759cb4 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQL2008DatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQL2008DatabaseAdapter.java
@@ -17,12 +17,11 @@
 package org.apache.nifi.processors.standard.db.impl;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.processors.standard.db.DatabaseAdapter;
 
 /**
  * A database adapter that generates MS SQL Compatible SQL for version 2008.
  */
-public class MSSQL2008DatabaseAdapter implements DatabaseAdapter {
+public class MSSQL2008DatabaseAdapter extends MSSQLDatabaseAdapter {
     @Override
     public String getName() {
         return "MS SQL 2008";

http://git-wip-us.apache.org/repos/asf/nifi/blob/2007c207/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
index 16f6532..3d23d9f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
@@ -79,4 +79,10 @@ public class MSSQLDatabaseAdapter implements DatabaseAdapter 
{
 
         return query.toString();
     }
+
+    @Override
+    public String unwrapIdentifier(String identifier) {
+        // Remove double quotes and square brackets.
+        return identifier == null ? null : identifier.replaceAll("[\"\\[\\]]", 
"");
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/2007c207/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MySQLDatabaseAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MySQLDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MySQLDatabaseAdapter.java
new file mode 100644
index 0000000..fdbc205
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MySQLDatabaseAdapter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+/**
+ * A generic database adapter that generates MySQL compatible SQL.
+ */
+public class MySQLDatabaseAdapter extends GenericDatabaseAdapter {
+    @Override
+    public String getName() {
+        return "MySQL";
+    }
+
+    @Override
+    public String getDescription() {
+        return "Generates MySQL compatible SQL";
+    }
+
+    @Override
+    public String unwrapIdentifier(String identifier) {
+        // Removes double quotes and back-ticks.
+        return identifier == null ? null : identifier.replaceAll("[\"`]", "");
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2007c207/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
index fe2df96..2f53cf7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
@@ -16,4 +16,5 @@ 
org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter
 org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter
 org.apache.nifi.processors.standard.db.impl.Oracle12DatabaseAdapter
 org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter
-org.apache.nifi.processors.standard.db.impl.MSSQL2008DatabaseAdapter
\ No newline at end of file
+org.apache.nifi.processors.standard.db.impl.MSSQL2008DatabaseAdapter
+org.apache.nifi.processors.standard.db.impl.MySQLDatabaseAdapter
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2007c207/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 510156f..ff6a7f0 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -29,6 +29,8 @@ import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
 import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.MySQLDatabaseAdapter;
 import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
@@ -145,16 +147,35 @@ public class QueryDatabaseTableTest {
         maxValues.put("id", "509");
         StateManager stateManager = runner.getStateManager();
         stateManager.setState(maxValues, Scope.CLUSTER);
-        processor.putColumnType("mytable" + 
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER);
+        processor.putColumnType(processor.getStateKey("mytable", "id", 
dbAdapter), Types.INTEGER);
         query = processor.getQuery(dbAdapter, "myTable", null, 
Collections.singletonList("id"), null, 
stateManager.getState(Scope.CLUSTER).toMap());
         assertEquals("SELECT * FROM myTable WHERE id > 509", query);
 
         maxValues.put("date_created", "2016-03-07 12:34:56");
         stateManager.setState(maxValues, Scope.CLUSTER);
-        processor.putColumnType("mytable" + 
AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", 
Types.TIMESTAMP);
+        processor.putColumnType(processor.getStateKey("mytable", 
"date_created", dbAdapter), Types.TIMESTAMP);
         query = processor.getQuery(dbAdapter, "myTable", null, 
Arrays.asList("id", "DATE_CREATED"), null, 
stateManager.getState(Scope.CLUSTER).toMap());
         assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= 
'2016-03-07 12:34:56'", query);
 
+        // Double quotes can be used to escape column and table names with 
most ANSI compatible database engines.
+        maxValues.put("mytable@!@date-created", "2016-03-07 12:34:56");
+        stateManager.setState(maxValues, Scope.CLUSTER);
+        processor.putColumnType(processor.getStateKey("\"myTable\"", 
"\"DATE-CREATED\"", dbAdapter), Types.TIMESTAMP);
+        query = processor.getQuery(dbAdapter, "\"myTable\"", null, 
Arrays.asList("id", "\"DATE-CREATED\""), null, 
stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM \"myTable\" WHERE id > 509 AND 
\"DATE-CREATED\" >= '2016-03-07 12:34:56'", query);
+
+        // Back-ticks can be used to escape MySQL column and table names.
+        dbAdapter = new MySQLDatabaseAdapter();
+        processor.putColumnType(processor.getStateKey("`myTable`", 
"`DATE-CREATED`", dbAdapter), Types.TIMESTAMP);
+        query = processor.getQuery(dbAdapter, "`myTable`", null, 
Arrays.asList("id", "`DATE-CREATED`"), null, 
stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM `myTable` WHERE id > 509 AND 
`DATE-CREATED` >= '2016-03-07 12:34:56'", query);
+
+        // Square brackets can be used to escape Microsoft SQL Server column 
and table names.
+        dbAdapter = new MSSQLDatabaseAdapter();
+        processor.putColumnType(processor.getStateKey("[myTable]", 
"[DATE-CREATED]", dbAdapter), Types.TIMESTAMP);
+        query = processor.getQuery(dbAdapter, "[myTable]", null, 
Arrays.asList("id", "[DATE-CREATED]"), null, 
stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM [myTable] WHERE id > 509 AND 
[DATE-CREATED] >= '2016-03-07 12:34:56'", query);
+
         // Test Oracle strategy
         dbAdapter = new OracleDatabaseAdapter();
         query = processor.getQuery(dbAdapter, "myTable", null, 
Arrays.asList("id", "DATE_CREATED"), "type = \"CUSTOMER\"", 
stateManager.getState(Scope.CLUSTER).toMap());

Reply via email to