Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2162#discussion_r166504892 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java --- @@ -249,34 +260,56 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi return; } - // Try to fill the columnTypeMap with the types of the desired max-value columns - final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); - final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + // Try to fill the columnTypeMap with the types of the desired max-value columns + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue(); final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); try (final Connection con = dbcpService.getConnection(); final Statement st = con.createStatement()) { - // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible - // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read - // approach as in Apache Drill - String query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null); - ResultSet resultSet = st.executeQuery(query); - ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); - int numCols = resultSetMetaData.getColumnCount(); - if (numCols > 0) { - if (shouldCleanCache) { - columnTypeMap.clear(); - } - for (int i = 1; i <= numCols; i++) { - String colName = resultSetMetaData.getColumnName(i).toLowerCase(); - String colKey = getStateKey(tableName, colName); - int colType = resultSetMetaData.getColumnType(i); - columnTypeMap.putIfAbsent(colKey, colType); + // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible + // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read + // approach as in Apache Drill + String query; + + if(StringUtils.isEmpty(sqlQuery)) { + query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null); + } else { + StringBuilder sbQuery = getWrappedQuery(sqlQuery, tableName); --- End diff -- I agree with you to avoid adding any logic to build SQL statement. How about adding one more condition at [AbstractDatabaseFetchProcessor.setup where it populates columnTypeMap](https://github.com/apache/nifi/blob/90d7926907b87a832407573ce20bd7ac5ba56bf9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java#L295 ), in order to filter out columns those are not specified by 'Max Value Columns'? This way, we don't have to modify SQL statement, but can minimize the number of columns to be stored in state. How do you think? Specifically, following lines of code: ``` // This part adds all columns into columnTypeMap for custom query. We want to capture maxValueColumns only. The maxValueColumnNameList below can be used to do so. for (int i = 1; i <= numCols; i++) { String colName = resultSetMetaData.getColumnName(i).toLowerCase(); String colKey = getStateKey(tableName, colName); int colType = resultSetMetaData.getColumnType(i); columnTypeMap.putIfAbsent(colKey, colType); } List<String> maxValueColumnNameList = Arrays.asList(maxValueColumnNames.split(",")); for(String maxValueColumn:maxValueColumnNameList){ String colKey = getStateKey(tableName, maxValueColumn.trim().toLowerCase()); if(!columnTypeMap.containsKey(colKey)){ throw new ProcessException("Column not found in the table/query specified: " + maxValueColumn); } } ```
---