[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396978#comment-16396978 ] ASF subversion and git services commented on NIFI-4833: --- Commit c2616e6fe78da68f391c3a4e5bb8a4a44d6b62a9 in nifi's branch refs/heads/master from [~Berezitsky] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=c2616e6 ] NIFI-4833 Add ScanHBase Processor - New processor for scanning HBase records based on verious params like range of rowkeys, range of timestamps. Supports result limit and reverse scan. - Adds Atlas Support for ScanHBase processor - Fixed not recent version of FF - Formatting and Style changes - Single line to multiline if-then statements - Removed HTML formatting that is not used for doc generation - Fixed issue with limitRows - Fixed issue with filter expression - Refactored "processRows" - Fixed possible NPE for bulkSize var - Changed provenance to "receive" to indicate new data from external source. - Updated min/max timestamp custom validation - JSON array support - Removed in-memory caching for records. Now records are being written directly to FF - Removed unfinished flowfile from session, transfered original to "failure". Test cases update as well This closes #2478. Signed-off-by: Bryan Bende > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396983#comment-16396983 ] ASF GitHub Bot commented on NIFI-4833: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2478 > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396969#comment-16396969 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bbende commented on the issue: https://github.com/apache/nifi/pull/2478 Latest update looks good, going to merge > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392985#comment-16392985 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bbende commented on the issue: https://github.com/apache/nifi/pull/2478 @bdesert thanks for the updates, there is one error case I think we need to handle and then we should be good to go... The case is if an exception happens half-way through handling the results and gets caught in the try-catch on lines 385-389, we currently only transfer the original flow file to failure and return, but the handler may have a flow file it created and was writing results to, and this would need to be removed from the session. The reason this isn't caught in the JUnit tests is because currently MockHBaseClientService lets you set throwException which throws an exception right at the beginning of scan before the handler has ever been called. If you want to create a way to test this you could introduce a new boolean like throwExceptionAfterNumResults and also take in an integer number of results. For quick testing I hacked a change into the MockHBaseClientService so that it throws an exception after the first result: ``` @Override public void scan(String tableName, String startRow, String endRow, String filterExpression, Long timerangeMin, Long timerangeMax, Integer limitRows, Boolean isReversed, Collection columns, ResultHandler handler) throws IOException { //if (throwException) { //throw new IOException("exception"); //} // pass all the staged data to the handler int resultCount = 0; for (final Map.Entry entry : results.entrySet()) { handler.handle(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue()); resultCount++; if (resultCount > 0) { throw new IOException("exception"); } } // delegate to the handler numScans++; } ``` Then updated the test case: ``` @Test public void testScanWhenScanThrowsException() { //hBaseClientService.setThrowException(true); final Map cells = new HashMap<>(); cells.put("cq1", "val1"); cells.put("cq2", "val2"); final long ts1 = 123456789; hBaseClientService.addResult("row1", cells, ts1); hBaseClientService.addResult("row2", cells, ts1); runner.setProperty(ScanHBase.TABLE_NAME, "table1"); runner.setProperty(ScanHBase.START_ROW, "row1"); runner.setProperty(ScanHBase.END_ROW, "row1"); runner.enqueue("trigger flow file"); runner.run(); runner.assertTransferCount(ScanHBase.REL_FAILURE, 1); runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); Assert.assertEquals(0, hBaseClientService.getNumScans()); } ``` > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390794#comment-16390794 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2478 @bbende , Bryan, committed the changes. Tested on a cluster, works as expected. When have a time, please review. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386463#comment-16386463 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2478 @bbende , thank you! Both comments make sense. Will commit these changes soon. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386279#comment-16386279 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bbende commented on the issue: https://github.com/apache/nifi/pull/2478 One other question, what do you envision people most likely do with the output of this processor? The reason I'm asking is because I'm debating if it makes sense to write multiple JSON documents to a single flow file without wrapping them in an array. GetHBase and FetchHBase didn't have this problem because they wrote a row per flow file (which probably wasn't a good idea for GetHBase). As an example scenario, say we have a bunch of rows coming out of this processor using the col-qual-val format like: ``` {"id":"", "message":"The time is Mon Mar 05 10:20:07 EST 2018"} {"id":"", "message":"The time is Mon Mar 05 10:21:03 EST 2018"} {"id":"", "message":"The time is Mon Mar 05 10:22:44 EST 2018"} {"id":"", "message":"The time is Mon Mar 05 10:22:44 EST 2018"} {"id":"", "message":"The time is Mon Mar 05 10:22:44 EST 2018"} {"id":"", "message":"The time is Mon Mar 05 10:22:44 EST 2018"} {"id":"", "message":"The time is Mon Mar 05 10:22:44 EST 2018"} {"id":"", "message":"The time is Mon Mar 05 10:22:44 EST 2018"} {"id":"", "message":"The time is Mon Mar 05 10:22:44 EST 2018"} ``` If we then created a schema for this: ``` { "name": "scan", "namespace": "nifi", "type": "record", "fields": [ { "name": "id", "type": "string" }, { "name": "message", "type": "string" } ] } ``` Then tried to use ConvertRecord with a JsonTreeReader and CsvRecordSetWriter, to convert from JSON to CSV, we get: ``` id,message "",The time is Mon Mar 05 10:20:07 EST 2018 ``` It only ends up converting the first JSON document because the JsonTreeReader doesn't know how to read multiple records unless its a JSON array. There may be cases where the current output makes sense so I'm not saying to change it yet, but just trying to think of what the most common scenario will be. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386243#comment-16386243 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bbende commented on the issue: https://github.com/apache/nifi/pull/2478 @bdesert Thanks for the updates, was reviewing the code again and I think we need to change to way the `ScanHBaseResultHandler` works... Currently it adds rows to a list in memory until bulk size is reached, and since bulk size defaults to 0, the default case will be that bulk size is never reached and all the rows are left as "hanging" rows. This means if someone scans a table with 1 million rows, all 1 millions will be in memory before being written to the flow file which would not be good for memory usage. We should be able to write row by row to the flow file and never add them to a list. Inside the handler we can use `session.append(flowFile, (out) ->` to append a row at a time to the flow file. I think we can then do away with the "hanging rows" concept because there won't be anything buffered in memory. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377916#comment-16377916 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2478 @bbende , I addressed all the comments. Thank you and let me know if you see more issues or have some recommendations/suggestions. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377383#comment-16377383 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170700319 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +publi
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377369#comment-16377369 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170697126 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +publi
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377316#comment-16377316 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170686168 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377317#comment-16377317 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170688235 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377319#comment-16377319 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170687174 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377318#comment-16377318 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170684472 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377315#comment-16377315 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170683857 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), --- End diff -- Can this be removed? Doesn't look like we allow writing the results to an attribute, which is a good thing :) > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HB
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371889#comment-16371889 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169749383 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +pu
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371808#comment-16371808 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169732184 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) --- End diff -- I was thinking about that before. Couldn't really decide, and then I took a look at DeleteHBaseRow and FetchHBaseRow and decided to keep it consistent. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371566#comment-16371566 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169680760 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java --- @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestScanHBase { + +private ScanHBase proc; +private MockHBaseClientService hBaseClientService; +private TestRunner runner; + +@Before +public void setup() throws InitializationException { +proc = new ScanHBase(); +runner = TestRunners.newTestRunner(proc); + +hBaseClientService = new MockHBaseClientService(); +runner.addControllerService("hbaseClient", hBaseClientService); +runner.enableControllerService(hBaseClientService); +runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient"); +} + +@Test +public void testColumnsValidation() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,"); +runner.assertNotValid(); +} + +@Test +public void testNoIncomingFlowFile() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.run(); +runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); +runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); +runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + +Assert.assertEquals(0, hBaseClientService.getNumScans()); +} + +@Test +public void testInvalidTableName() { +runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.enqueue("trigger flow file"); +runner.run(); + +runner.assertTransferCount(ScanHBase.REL_FAILURE, 1); +runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); +runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + +Assert.assertEquals(0, hBaseClientService.getNumScans()); +} + +@Test +public void testResultsNotFound() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371558#comment-16371558 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169677516 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java --- @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestScanHBase { + +private ScanHBase proc; +private MockHBaseClientService hBaseClientService; +private TestRunner runner; + +@Before +public void setup() throws InitializationException { +proc = new ScanHBase(); +runner = TestRunners.newTestRunner(proc); + +hBaseClientService = new MockHBaseClientService(); +runner.addControllerService("hbaseClient", hBaseClientService); +runner.enableControllerService(hBaseClientService); +runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient"); +} + +@Test +public void testColumnsValidation() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,"); +runner.assertNotValid(); +} + +@Test +public void testNoIncomingFlowFile() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.run(); +runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); +runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); +runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + +Assert.assertEquals(0, hBaseClientService.getNumScans()); +} + +@Test +public void testInvalidTableName() { +runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); --- End diff -- Not setting a value for "hbase.table" is intentional. This test is for failure handling if expression is invalid (cannot be evaluated). You can see that FF expected at REL_FAILURE without scans. If I just didn't understand what you meant, please let me know. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor t
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371348#comment-16371348 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2478 @bbende @ijokarumawak +1 LGTM but it needs a second look. I tested it with HBase locally and it was able to work just fine with what I threw at it. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371343#comment-16371343 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169624523 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +})
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371337#comment-16371337 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169623164 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) --- End diff -- Are you sure you don't want to do `INPUT_ALLOWED`? I can see good reasons not to do that. If you want to make it optional, see `ExecuteSQL` for some code you can steal to make it not do processing on flowfile input AND time interval. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371331#comment-16371331 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169622730 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java --- @@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] startRow, final byte[] end } } +@Override +public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, +final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, +final Collection columns, final ResultHandler handler) throws IOException { + +try (final Table table = connection.getTable(TableName.valueOf(tableName)); +final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, +timerangeMax, limitRows, isReversed, columns)) { + +int cnt = 0; +final int lim = limitRows != null ? limitRows : 0; +for (final Result result : scanner) { + +if (lim > 0 && cnt++ > lim) break; + +final byte[] rowKey = result.getRow(); +final Cell[] cells = result.rawCells(); + +if (cells == null) { +continue; +} + +// convert HBase cells to NiFi cells +final ResultCell[] resultCells = new ResultCell[cells.length]; +for (int i = 0; i < cells.length; i++) { +final Cell cell = cells[i]; +final ResultCell resultCell = getResultCell(cell); +resultCells[i] = resultCell; +} + +// delegate to the handler +handler.handle(rowKey, resultCells); +} +} + +} + +// +protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, +final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { +final Scan scan = new Scan(); +if (!StringUtils.isBlank(startRow)) scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); +if (!StringUtils.isBlank(endRow)) scan.setStopRow( endRow.getBytes(StandardCharsets.UTF_8)); + + +Filter filter = null; +if (columns != null) { +for (Column col : columns) { +if (col.getQualifier() == null) { +scan.addFamily(col.getFamily()); +} else { +scan.addColumn(col.getFamily(), col.getQualifier()); +} +} +}else if (!StringUtils.isBlank(filterExpression)) { +ParseFilter parseFilter = new ParseFilter(); +filter = parseFilter.parseFilterString(filterExpression); +} +if (filter != null) scan.setFilter(filter); + +if (timerangeMin != null && timerangeMax != null) scan.setTimeRange(timerangeMin, timerangeMax); --- End diff -- Separate lines please. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371332#comment-16371332 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169622786 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java --- @@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] startRow, final byte[] end } } +@Override +public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, +final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, +final Collection columns, final ResultHandler handler) throws IOException { + +try (final Table table = connection.getTable(TableName.valueOf(tableName)); +final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, +timerangeMax, limitRows, isReversed, columns)) { + +int cnt = 0; +final int lim = limitRows != null ? limitRows : 0; +for (final Result result : scanner) { + +if (lim > 0 && cnt++ > lim) break; + +final byte[] rowKey = result.getRow(); +final Cell[] cells = result.rawCells(); + +if (cells == null) { +continue; +} + +// convert HBase cells to NiFi cells +final ResultCell[] resultCells = new ResultCell[cells.length]; +for (int i = 0; i < cells.length; i++) { +final Cell cell = cells[i]; +final ResultCell resultCell = getResultCell(cell); +resultCells[i] = resultCell; +} + +// delegate to the handler +handler.handle(rowKey, resultCells); +} +} + +} + +// +protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, +final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { +final Scan scan = new Scan(); +if (!StringUtils.isBlank(startRow)) scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); +if (!StringUtils.isBlank(endRow)) scan.setStopRow( endRow.getBytes(StandardCharsets.UTF_8)); + + +Filter filter = null; +if (columns != null) { +for (Column col : columns) { +if (col.getQualifier() == null) { +scan.addFamily(col.getFamily()); +} else { +scan.addColumn(col.getFamily(), col.getQualifier()); +} +} +}else if (!StringUtils.isBlank(filterExpression)) { +ParseFilter parseFilter = new ParseFilter(); +filter = parseFilter.parseFilterString(filterExpression); +} +if (filter != null) scan.setFilter(filter); --- End diff -- Separate lines with curly brackets. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371334#comment-16371334 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169622823 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java --- @@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] startRow, final byte[] end } } +@Override +public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, +final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, +final Collection columns, final ResultHandler handler) throws IOException { + +try (final Table table = connection.getTable(TableName.valueOf(tableName)); +final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, +timerangeMax, limitRows, isReversed, columns)) { + +int cnt = 0; +final int lim = limitRows != null ? limitRows : 0; +for (final Result result : scanner) { + +if (lim > 0 && cnt++ > lim) break; + +final byte[] rowKey = result.getRow(); +final Cell[] cells = result.rawCells(); + +if (cells == null) { +continue; +} + +// convert HBase cells to NiFi cells +final ResultCell[] resultCells = new ResultCell[cells.length]; +for (int i = 0; i < cells.length; i++) { +final Cell cell = cells[i]; +final ResultCell resultCell = getResultCell(cell); +resultCells[i] = resultCell; +} + +// delegate to the handler +handler.handle(rowKey, resultCells); +} +} + +} + +// +protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, +final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { +final Scan scan = new Scan(); +if (!StringUtils.isBlank(startRow)) scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); +if (!StringUtils.isBlank(endRow)) scan.setStopRow( endRow.getBytes(StandardCharsets.UTF_8)); + + +Filter filter = null; +if (columns != null) { +for (Column col : columns) { +if (col.getQualifier() == null) { +scan.addFamily(col.getFamily()); +} else { +scan.addColumn(col.getFamily(), col.getQualifier()); +} +} +}else if (!StringUtils.isBlank(filterExpression)) { +ParseFilter parseFilter = new ParseFilter(); +filter = parseFilter.parseFilterString(filterExpression); +} +if (filter != null) scan.setFilter(filter); + +if (timerangeMin != null && timerangeMax != null) scan.setTimeRange(timerangeMin, timerangeMax); + +// ->>> reserved for HBase v 2 or later +//if (limitRows != null && limitRows > 0){ +//scan.setLimit(limitRows) +//} + +if (isReversed != null) scan.setReversed(isReversed); --- End diff -- Same as above. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of re
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371330#comment-16371330 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169622591 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java --- @@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] startRow, final byte[] end } } +@Override +public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, +final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, +final Collection columns, final ResultHandler handler) throws IOException { + +try (final Table table = connection.getTable(TableName.valueOf(tableName)); +final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, +timerangeMax, limitRows, isReversed, columns)) { + +int cnt = 0; +final int lim = limitRows != null ? limitRows : 0; +for (final Result result : scanner) { + +if (lim > 0 && cnt++ > lim) break; + +final byte[] rowKey = result.getRow(); +final Cell[] cells = result.rawCells(); + +if (cells == null) { +continue; +} + +// convert HBase cells to NiFi cells +final ResultCell[] resultCells = new ResultCell[cells.length]; +for (int i = 0; i < cells.length; i++) { +final Cell cell = cells[i]; +final ResultCell resultCell = getResultCell(cell); +resultCells[i] = resultCell; +} + +// delegate to the handler +handler.handle(rowKey, resultCells); +} +} + +} + +// +protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, +final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { +final Scan scan = new Scan(); +if (!StringUtils.isBlank(startRow)) scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); +if (!StringUtils.isBlank(endRow)) scan.setStopRow( endRow.getBytes(StandardCharsets.UTF_8)); --- End diff -- See above. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371329#comment-16371329 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169622554 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java --- @@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] startRow, final byte[] end } } +@Override +public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, +final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, +final Collection columns, final ResultHandler handler) throws IOException { + +try (final Table table = connection.getTable(TableName.valueOf(tableName)); +final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, +timerangeMax, limitRows, isReversed, columns)) { + +int cnt = 0; +final int lim = limitRows != null ? limitRows : 0; +for (final Result result : scanner) { + +if (lim > 0 && cnt++ > lim) break; + +final byte[] rowKey = result.getRow(); +final Cell[] cells = result.rawCells(); + +if (cells == null) { +continue; +} + +// convert HBase cells to NiFi cells +final ResultCell[] resultCells = new ResultCell[cells.length]; +for (int i = 0; i < cells.length; i++) { +final Cell cell = cells[i]; +final ResultCell resultCell = getResultCell(cell); +resultCells[i] = resultCell; +} + +// delegate to the handler +handler.handle(rowKey, resultCells); +} +} + +} + +// +protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, +final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { +final Scan scan = new Scan(); +if (!StringUtils.isBlank(startRow)) scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); --- End diff -- This should be on separate lines like this: ``` if (!StringUtils.isBlank(startRow)) { scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); } ``` > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371325#comment-16371325 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169622126 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java --- @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestScanHBase { + +private ScanHBase proc; +private MockHBaseClientService hBaseClientService; +private TestRunner runner; + +@Before +public void setup() throws InitializationException { +proc = new ScanHBase(); +runner = TestRunners.newTestRunner(proc); + +hBaseClientService = new MockHBaseClientService(); +runner.addControllerService("hbaseClient", hBaseClientService); +runner.enableControllerService(hBaseClientService); +runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient"); +} + +@Test +public void testColumnsValidation() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,"); +runner.assertNotValid(); +} + +@Test +public void testNoIncomingFlowFile() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.run(); +runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); +runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); +runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + +Assert.assertEquals(0, hBaseClientService.getNumScans()); +} + +@Test +public void testInvalidTableName() { +runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.enqueue("trigger flow file"); +runner.run(); + +runner.assertTransferCount(ScanHBase.REL_FAILURE, 1); +runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); +runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + +Assert.assertEquals(0, hBaseClientService.getNumScans()); +} + +@Test +public void testResultsNotFound() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +ru
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371320#comment-16371320 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169621787 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java --- @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestScanHBase { + +private ScanHBase proc; +private MockHBaseClientService hBaseClientService; +private TestRunner runner; + +@Before +public void setup() throws InitializationException { +proc = new ScanHBase(); +runner = TestRunners.newTestRunner(proc); + +hBaseClientService = new MockHBaseClientService(); +runner.addControllerService("hbaseClient", hBaseClientService); +runner.enableControllerService(hBaseClientService); +runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient"); +} + +@Test +public void testColumnsValidation() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,"); +runner.assertNotValid(); +} + +@Test +public void testNoIncomingFlowFile() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.run(); +runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); +runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); +runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + +Assert.assertEquals(0, hBaseClientService.getNumScans()); +} + +@Test +public void testInvalidTableName() { +runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); --- End diff -- Without calling `setVariable` and `setValidateExpressionLanguage` (think that's its name) I'm not sure if this is going to do more than test the raw value in that field. Granted, that is an invalid name. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16369549#comment-16369549 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169176478 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +pu
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16369113#comment-16369113 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169072654 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +})
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368391#comment-16368391 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2478 I'll try to take a look this weekend. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368385#comment-16368385 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert commented on the issue: https://github.com/apache/nifi/pull/2478 @MikeThomsen , are you available to re-review this PR? I have addressed you comment regarding branch and the rest (except for labels, which can be added later in bulk for all the HBase related processors) > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368384#comment-16368384 ] ASF GitHub Bot commented on NIFI-4833: -- GitHub user bdesert opened a pull request: https://github.com/apache/nifi/pull/2478 NIFI-4833 Add scanHBase Processor Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdesert/nifi NIFI-4833-Add-ScanHBase-processor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2478.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2478 commit 39bd6fb5d02eb7dca63830967823a4bb48c5712c Author: Ed Date: 2018-02-17T21:26:04Z Add ScanHBase Processor New processor for scanning HBase records based on verious params like range of rowkeys, range of timestamps. Supports result limit and reverse scan. commit d2f5410be14a77f64e7ca5593e6c908620a8da58 Author: Ed Date: 2018-02-17T21:27:18Z Adds Atlas Support for ScanHBase processor Adds Atlas Support for ScanHBase processor > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368349#comment-16368349 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert closed the pull request at: https://github.com/apache/nifi/pull/2446 > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349091#comment-16349091 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2446#discussion_r165449601 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase exte
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349089#comment-16349089 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2446#discussion_r165449486 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase exte
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349079#comment-16349079 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2446#discussion_r165448494 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase exte
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349069#comment-16349069 ] ASF GitHub Bot commented on NIFI-4833: -- Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2446#discussion_r165447440 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase exte
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348556#comment-16348556 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2446 I have a patch that I've been working on for adding support for HBase visibility labels to the existing processors. Might want to think about how to integrate that into this processor. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348553#comment-16348553 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2446#discussion_r165338502 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348552#comment-16348552 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2446#discussion_r165304550 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348550#comment-16348550 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2446#discussion_r165304841 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348551#comment-16348551 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2446#discussion_r165304629 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348308#comment-16348308 ] ASF GitHub Bot commented on NIFI-4833: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2446 I'll take a stab at reviewing, but you should git cherry pick this onto a git branch and not do it off your master branch. > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4833) NIFI-4833 Add ScanHBase processor
[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347776#comment-16347776 ] ASF GitHub Bot commented on NIFI-4833: -- GitHub user bdesert opened a pull request: https://github.com/apache/nifi/pull/2446 NIFI-4833 Add ScanHBase processor ### Description: Add new processor ScanHBase and a test package. -- Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [v] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [v] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [v] Has your PR been rebased against the latest commit within the target branch (typically master)? - [v] Is your initial contribution a single, squashed commit? ### For code changes: - [v] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [v] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [v] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdesert/nifi master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2446.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2446 commit 7a9dc565f43284a8535de052a812a252c6950613 Author: Ed Date: 2018-01-31T21:20:35Z NIFI-4833 Add ScanHBase processor > NIFI-4833 Add ScanHBase processor > - > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Ed Berezitsky >Assignee: Ed Berezitsky >Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)