Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2478#discussion_r169622823
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 ---
    @@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] 
startRow, final byte[] end
             }
         }
     
    +    @Override
    +    public void scan(final String tableName, final String startRow, final 
String endRow, String filterExpression,
    +            final Long timerangeMin, final Long timerangeMax, final 
Integer limitRows, final Boolean isReversed,
    +            final Collection<Column> columns, final ResultHandler handler) 
throws IOException {
    +
    +        try (final Table table = 
connection.getTable(TableName.valueOf(tableName));
    +                final ResultScanner scanner = getResults(table, startRow, 
endRow, filterExpression, timerangeMin,
    +                        timerangeMax, limitRows, isReversed, columns)) {
    +
    +            int cnt = 0;
    +            final int lim = limitRows != null ? limitRows : 0;
    +            for (final Result result : scanner) {
    +
    +                if (lim > 0 && cnt++ > lim) break;
    +
    +                final byte[] rowKey = result.getRow();
    +                final Cell[] cells = result.rawCells();
    +
    +                if (cells == null) {
    +                    continue;
    +                }
    +
    +                // convert HBase cells to NiFi cells
    +                final ResultCell[] resultCells = new 
ResultCell[cells.length];
    +                for (int i = 0; i < cells.length; i++) {
    +                    final Cell cell = cells[i];
    +                    final ResultCell resultCell = getResultCell(cell);
    +                    resultCells[i] = resultCell;
    +                }
    +
    +                // delegate to the handler
    +                handler.handle(rowKey, resultCells);
    +            }
    +        }
    +
    +    }
    +
    +    //
    +    protected ResultScanner getResults(final Table table, final String 
startRow, final String endRow, final String filterExpression, final Long 
timerangeMin, final Long timerangeMax,
    +            final Integer limitRows, final Boolean isReversed, final 
Collection<Column> columns)  throws IOException {
    +        final Scan scan = new Scan();
    +        if (!StringUtils.isBlank(startRow)) 
scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
    +        if (!StringUtils.isBlank(endRow))   scan.setStopRow(   
endRow.getBytes(StandardCharsets.UTF_8));
    +
    +
    +        Filter filter = null;
    +        if (columns != null) {
    +            for (Column col : columns) {
    +                if (col.getQualifier() == null) {
    +                    scan.addFamily(col.getFamily());
    +                } else {
    +                    scan.addColumn(col.getFamily(), col.getQualifier());
    +                }
    +            }
    +        }else if (!StringUtils.isBlank(filterExpression)) {
    +            ParseFilter parseFilter = new ParseFilter();
    +            filter = parseFilter.parseFilterString(filterExpression);
    +        }
    +        if (filter != null) scan.setFilter(filter);
    +
    +        if (timerangeMin != null && timerangeMax != null) 
scan.setTimeRange(timerangeMin, timerangeMax);
    +
    +        // ->>> reserved for HBase v 2 or later
    +        //if (limitRows != null && limitRows > 0){
    +        //    scan.setLimit(limitRows)
    +        //}
    +
    +        if (isReversed != null) scan.setReversed(isReversed);
    --- End diff --
    
    Same as above.


---

Reply via email to