Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169622823 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java --- @@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] startRow, final byte[] end } } + @Override + public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, + final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, + final Collection<Column> columns, final ResultHandler handler) throws IOException { + + try (final Table table = connection.getTable(TableName.valueOf(tableName)); + final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, + timerangeMax, limitRows, isReversed, columns)) { + + int cnt = 0; + final int lim = limitRows != null ? limitRows : 0; + for (final Result result : scanner) { + + if (lim > 0 && cnt++ > lim) break; + + final byte[] rowKey = result.getRow(); + final Cell[] cells = result.rawCells(); + + if (cells == null) { + continue; + } + + // convert HBase cells to NiFi cells + final ResultCell[] resultCells = new ResultCell[cells.length]; + for (int i = 0; i < cells.length; i++) { + final Cell cell = cells[i]; + final ResultCell resultCell = getResultCell(cell); + resultCells[i] = resultCell; + } + + // delegate to the handler + handler.handle(rowKey, resultCells); + } + } + + } + + // + protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, + final Integer limitRows, final Boolean isReversed, final Collection<Column> columns) throws IOException { + final Scan scan = new Scan(); + if (!StringUtils.isBlank(startRow)) scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); + if (!StringUtils.isBlank(endRow)) scan.setStopRow( endRow.getBytes(StandardCharsets.UTF_8)); + + + Filter filter = null; + if (columns != null) { + for (Column col : columns) { + if (col.getQualifier() == null) { + scan.addFamily(col.getFamily()); + } else { + scan.addColumn(col.getFamily(), col.getQualifier()); + } + } + }else if (!StringUtils.isBlank(filterExpression)) { + ParseFilter parseFilter = new ParseFilter(); + filter = parseFilter.parseFilterString(filterExpression); + } + if (filter != null) scan.setFilter(filter); + + if (timerangeMin != null && timerangeMax != null) scan.setTimeRange(timerangeMin, timerangeMax); + + // ->>> reserved for HBase v 2 or later + //if (limitRows != null && limitRows > 0){ + // scan.setLimit(limitRows) + //} + + if (isReversed != null) scan.setReversed(isReversed); --- End diff -- Same as above.
---