[ 
https://issues.apache.org/jira/browse/NIFI-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353876#comment-16353876
 ] 

ASF GitHub Bot commented on NIFI-1706:
--------------------------------------

Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2162#discussion_r166278036
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
 ---
    @@ -249,34 +260,56 @@ public void setup(final ProcessContext context, 
boolean shouldCleanCache, FlowFi
                     return;
                 }
     
    -            // Try to fill the columnTypeMap with the types of the desired 
max-value columns
    -            final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
    -            final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +        // Try to fill the columnTypeMap with the types of the desired 
max-value columns
    +        final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
    +        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +        final String sqlQuery = 
context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
     
                 final DatabaseAdapter dbAdapter = 
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
                 try (final Connection con = dbcpService.getConnection();
                      final Statement st = con.createStatement()) {
     
    -                // Try a query that returns no rows, for the purposes of 
getting metadata about the columns. It is possible
    -                // to use DatabaseMetaData.getColumns(), but not all 
drivers support this, notably the schema-on-read
    -                // approach as in Apache Drill
    -                String query = dbAdapter.getSelectStatement(tableName, 
maxValueColumnNames, "1 = 0", null, null, null);
    -                ResultSet resultSet = st.executeQuery(query);
    -                ResultSetMetaData resultSetMetaData = 
resultSet.getMetaData();
    -                int numCols = resultSetMetaData.getColumnCount();
    -                if (numCols > 0) {
    -                    if (shouldCleanCache) {
    -                        columnTypeMap.clear();
    -                    }
    -                    for (int i = 1; i <= numCols; i++) {
    -                        String colName = 
resultSetMetaData.getColumnName(i).toLowerCase();
    -                        String colKey = getStateKey(tableName, colName);
    -                        int colType = resultSetMetaData.getColumnType(i);
    -                        columnTypeMap.putIfAbsent(colKey, colType);
    +            // Try a query that returns no rows, for the purposes of 
getting metadata about the columns. It is possible
    +            // to use DatabaseMetaData.getColumns(), but not all drivers 
support this, notably the schema-on-read
    +            // approach as in Apache Drill
    +            String query;
    +
    +            if(StringUtils.isEmpty(sqlQuery)) {
    +                query = dbAdapter.getSelectStatement(tableName, 
maxValueColumnNames, "1 = 0", null, null, null);
    +            } else {
    +                StringBuilder sbQuery = getWrappedQuery(sqlQuery, 
tableName);
    +                sbQuery.append(" WHERE 1=0");
    +
    +                query = sbQuery.toString();
    +            }
    +
    +            ResultSet resultSet = st.executeQuery(query);
    +            ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
    +            int numCols = resultSetMetaData.getColumnCount();
    +            if (numCols > 0) {
    +                if (shouldCleanCache){
    +                    columnTypeMap.clear();
    +                }
    +                for (int i = 1; i <= numCols; i++) {
    +                    String colName = 
resultSetMetaData.getColumnName(i).toLowerCase();
    +                    String colKey = getStateKey(tableName, colName);
    +                    int colType = resultSetMetaData.getColumnType(i);
    +                    columnTypeMap.putIfAbsent(colKey, colType);
    +                }
    +
    +                List<String> maxValueColumnNameList = 
org.apache.commons.lang3.StringUtils.isEmpty(maxValueColumnNames)
    --- End diff --
    
    I think we can use `org.apache.nifi.util.StringUtils` here instead, which 
is already imported. Moreover, we can remove this emptiness check because it's 
already checked at the beginning of this method.


> Extend QueryDatabaseTable to support arbitrary queries
> ------------------------------------------------------
>
>                 Key: NIFI-1706
>                 URL: https://issues.apache.org/jira/browse/NIFI-1706
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>    Affects Versions: 1.4.0
>            Reporter: Paul Bormans
>            Assignee: Peter Wicks
>            Priority: Major
>              Labels: features
>
> The QueryDatabaseTable is able to observe a configured database table for new 
> rows and yield these into the flowfile. The model of an rdbms however is 
> often (if not always) normalized so you would need to join various tables in 
> order to "flatten" the data into useful events for a processing pipeline as 
> can be build with nifi or various tools within the hadoop ecosystem.
> The request is to extend the processor to specify an arbitrary sql query 
> instead of specifying the table name + columns.
> In addition (this may be another issue?) it is desired to limit the number of 
> rows returned per run. Not just because of bandwidth issue's from the nifi 
> pipeline onwards but mainly because huge databases may not be able to return 
> so many records within a reasonable time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to