[ https://issues.apache.org/jira/browse/NIFI-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353876#comment-16353876 ]
ASF GitHub Bot commented on NIFI-1706: -------------------------------------- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2162#discussion_r166278036 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java --- @@ -249,34 +260,56 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi return; } - // Try to fill the columnTypeMap with the types of the desired max-value columns - final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); - final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + // Try to fill the columnTypeMap with the types of the desired max-value columns + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue(); final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); try (final Connection con = dbcpService.getConnection(); final Statement st = con.createStatement()) { - // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible - // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read - // approach as in Apache Drill - String query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null); - ResultSet resultSet = st.executeQuery(query); - ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); - int numCols = resultSetMetaData.getColumnCount(); - if (numCols > 0) { - if (shouldCleanCache) { - columnTypeMap.clear(); - } - for (int i = 1; i <= numCols; i++) { - String colName = resultSetMetaData.getColumnName(i).toLowerCase(); - String colKey = getStateKey(tableName, colName); - int colType = resultSetMetaData.getColumnType(i); - columnTypeMap.putIfAbsent(colKey, colType); + // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible + // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read + // approach as in Apache Drill + String query; + + if(StringUtils.isEmpty(sqlQuery)) { + query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null); + } else { + StringBuilder sbQuery = getWrappedQuery(sqlQuery, tableName); + sbQuery.append(" WHERE 1=0"); + + query = sbQuery.toString(); + } + + ResultSet resultSet = st.executeQuery(query); + ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + int numCols = resultSetMetaData.getColumnCount(); + if (numCols > 0) { + if (shouldCleanCache){ + columnTypeMap.clear(); + } + for (int i = 1; i <= numCols; i++) { + String colName = resultSetMetaData.getColumnName(i).toLowerCase(); + String colKey = getStateKey(tableName, colName); + int colType = resultSetMetaData.getColumnType(i); + columnTypeMap.putIfAbsent(colKey, colType); + } + + List<String> maxValueColumnNameList = org.apache.commons.lang3.StringUtils.isEmpty(maxValueColumnNames) --- End diff -- I think we can use `org.apache.nifi.util.StringUtils` here instead, which is already imported. Moreover, we can remove this emptiness check because it's already checked at the beginning of this method. > Extend QueryDatabaseTable to support arbitrary queries > ------------------------------------------------------ > > Key: NIFI-1706 > URL: https://issues.apache.org/jira/browse/NIFI-1706 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework > Affects Versions: 1.4.0 > Reporter: Paul Bormans > Assignee: Peter Wicks > Priority: Major > Labels: features > > The QueryDatabaseTable is able to observe a configured database table for new > rows and yield these into the flowfile. The model of an rdbms however is > often (if not always) normalized so you would need to join various tables in > order to "flatten" the data into useful events for a processing pipeline as > can be build with nifi or various tools within the hadoop ecosystem. > The request is to extend the processor to specify an arbitrary sql query > instead of specifying the table name + columns. > In addition (this may be another issue?) it is desired to limit the number of > rows returned per run. Not just because of bandwidth issue's from the nifi > pipeline onwards but mainly because huge databases may not be able to return > so many records within a reasonable time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)