[ 
https://issues.apache.org/jira/browse/NIFI-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354881#comment-16354881
 ] 

ASF GitHub Bot commented on NIFI-1706:
--------------------------------------

Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2162#discussion_r166504892
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
 ---
    @@ -249,34 +260,56 @@ public void setup(final ProcessContext context, 
boolean shouldCleanCache, FlowFi
                     return;
                 }
     
    -            // Try to fill the columnTypeMap with the types of the desired 
max-value columns
    -            final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
    -            final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +        // Try to fill the columnTypeMap with the types of the desired 
max-value columns
    +        final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
    +        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +        final String sqlQuery = 
context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
     
                 final DatabaseAdapter dbAdapter = 
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
                 try (final Connection con = dbcpService.getConnection();
                      final Statement st = con.createStatement()) {
     
    -                // Try a query that returns no rows, for the purposes of 
getting metadata about the columns. It is possible
    -                // to use DatabaseMetaData.getColumns(), but not all 
drivers support this, notably the schema-on-read
    -                // approach as in Apache Drill
    -                String query = dbAdapter.getSelectStatement(tableName, 
maxValueColumnNames, "1 = 0", null, null, null);
    -                ResultSet resultSet = st.executeQuery(query);
    -                ResultSetMetaData resultSetMetaData = 
resultSet.getMetaData();
    -                int numCols = resultSetMetaData.getColumnCount();
    -                if (numCols > 0) {
    -                    if (shouldCleanCache) {
    -                        columnTypeMap.clear();
    -                    }
    -                    for (int i = 1; i <= numCols; i++) {
    -                        String colName = 
resultSetMetaData.getColumnName(i).toLowerCase();
    -                        String colKey = getStateKey(tableName, colName);
    -                        int colType = resultSetMetaData.getColumnType(i);
    -                        columnTypeMap.putIfAbsent(colKey, colType);
    +            // Try a query that returns no rows, for the purposes of 
getting metadata about the columns. It is possible
    +            // to use DatabaseMetaData.getColumns(), but not all drivers 
support this, notably the schema-on-read
    +            // approach as in Apache Drill
    +            String query;
    +
    +            if(StringUtils.isEmpty(sqlQuery)) {
    +                query = dbAdapter.getSelectStatement(tableName, 
maxValueColumnNames, "1 = 0", null, null, null);
    +            } else {
    +                StringBuilder sbQuery = getWrappedQuery(sqlQuery, 
tableName);
    --- End diff --
    
    I agree with you to avoid adding any logic to build SQL statement. How 
about adding one more condition at [AbstractDatabaseFetchProcessor.setup where 
it populates 
columnTypeMap](https://github.com/apache/nifi/blob/90d7926907b87a832407573ce20bd7ac5ba56bf9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java#L295
    ), in order to filter out columns those are not specified by 'Max Value 
Columns'? This way, we don't have to modify SQL statement, but can minimize the 
number of columns to be stored in state. How do you think?
    
    Specifically, following lines of code:
    ```
    // This part adds all columns into columnTypeMap for custom query. We want 
to capture maxValueColumns only. The maxValueColumnNameList below can be used 
to do so.
                        for (int i = 1; i <= numCols; i++) {
                            String colName = 
resultSetMetaData.getColumnName(i).toLowerCase();
                            String colKey = getStateKey(tableName, colName);
                            int colType = resultSetMetaData.getColumnType(i);
                            columnTypeMap.putIfAbsent(colKey, colType);
                        }
    
                        List<String> maxValueColumnNameList = 
Arrays.asList(maxValueColumnNames.split(","));
    
                        for(String maxValueColumn:maxValueColumnNameList){
                            String colKey = getStateKey(tableName, 
maxValueColumn.trim().toLowerCase());
                            if(!columnTypeMap.containsKey(colKey)){
                                throw new ProcessException("Column not found in 
the table/query specified: " + maxValueColumn);
                            }
                        }
    ```



> Extend QueryDatabaseTable to support arbitrary queries
> ------------------------------------------------------
>
>                 Key: NIFI-1706
>                 URL: https://issues.apache.org/jira/browse/NIFI-1706
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>    Affects Versions: 1.4.0
>            Reporter: Paul Bormans
>            Assignee: Peter Wicks
>            Priority: Major
>              Labels: features
>
> The QueryDatabaseTable is able to observe a configured database table for new 
> rows and yield these into the flowfile. The model of an rdbms however is 
> often (if not always) normalized so you would need to join various tables in 
> order to "flatten" the data into useful events for a processing pipeline as 
> can be build with nifi or various tools within the hadoop ecosystem.
> The request is to extend the processor to specify an arbitrary sql query 
> instead of specifying the table name + columns.
> In addition (this may be another issue?) it is desired to limit the number of 
> rows returned per run. Not just because of bandwidth issue's from the nifi 
> pipeline onwards but mainly because huge databases may not be able to return 
> so many records within a reasonable time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to