[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371329#comment-16371329 ]
ASF GitHub Bot commented on NIFI-4833: -------------------------------------- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169622554 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java --- @@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] startRow, final byte[] end } } + @Override + public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, + final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, + final Collection<Column> columns, final ResultHandler handler) throws IOException { + + try (final Table table = connection.getTable(TableName.valueOf(tableName)); + final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, + timerangeMax, limitRows, isReversed, columns)) { + + int cnt = 0; + final int lim = limitRows != null ? limitRows : 0; + for (final Result result : scanner) { + + if (lim > 0 && cnt++ > lim) break; + + final byte[] rowKey = result.getRow(); + final Cell[] cells = result.rawCells(); + + if (cells == null) { + continue; + } + + // convert HBase cells to NiFi cells + final ResultCell[] resultCells = new ResultCell[cells.length]; + for (int i = 0; i < cells.length; i++) { + final Cell cell = cells[i]; + final ResultCell resultCell = getResultCell(cell); + resultCells[i] = resultCell; + } + + // delegate to the handler + handler.handle(rowKey, resultCells); + } + } + + } + + // + protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, + final Integer limitRows, final Boolean isReversed, final Collection<Column> columns) throws IOException { + final Scan scan = new Scan(); + if (!StringUtils.isBlank(startRow)) scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); --- End diff -- This should be on separate lines like this: ``` if (!StringUtils.isBlank(startRow)) { scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); } ``` > NIFI-4833 Add ScanHBase processor > --------------------------------- > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Reporter: Ed Berezitsky > Assignee: Ed Berezitsky > Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)