[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2478 ---
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170700319 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { +//enhanced regex for columns to allow "-" in column qualifier names +static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*"); +static final
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170697126 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { +//enhanced regex for columns to allow "-" in column qualifier names +static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*"); +static final
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170684472 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { +//enhanced regex for columns to allow "-" in column qualifier names +static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*"); +static final
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170683857 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), --- End diff -- Can this be removed? Doesn't look like we allow writing the results to an attribute, which is a good thing :) ---
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170688235 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { +//enhanced regex for columns to allow "-" in column qualifier names +static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*"); +static final
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170687174 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { +//enhanced regex for columns to allow "-" in column qualifier names +static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*"); +static final
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r170686168 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. " ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { +//enhanced regex for columns to allow "-" in column qualifier names +static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*"); +static final
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169749383 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { +//enhanced regex for columns to allow "-" in column qualifier names +static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*"); +static
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169732184 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) --- End diff -- I was thinking about that before. Couldn't really decide, and then I took a look at DeleteHBaseRow and FetchHBaseRow and decided to keep it consistent. ---
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169680760 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java --- @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestScanHBase { + +private ScanHBase proc; +private MockHBaseClientService hBaseClientService; +private TestRunner runner; + +@Before +public void setup() throws InitializationException { +proc = new ScanHBase(); +runner = TestRunners.newTestRunner(proc); + +hBaseClientService = new MockHBaseClientService(); +runner.addControllerService("hbaseClient", hBaseClientService); +runner.enableControllerService(hBaseClientService); +runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient"); +} + +@Test +public void testColumnsValidation() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,"); +runner.assertNotValid(); +} + +@Test +public void testNoIncomingFlowFile() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.run(); +runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); +runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); +runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + +Assert.assertEquals(0, hBaseClientService.getNumScans()); +} + +@Test +public void testInvalidTableName() { +runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.enqueue("trigger flow file"); +runner.run(); + +runner.assertTransferCount(ScanHBase.REL_FAILURE, 1); +runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); +runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + +Assert.assertEquals(0, hBaseClientService.getNumScans()); +} + +@Test +public void testResultsNotFound() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.enqueue("trigger flow file"); +runner.run(); + +runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); +
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169677516 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java --- @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestScanHBase { + +private ScanHBase proc; +private MockHBaseClientService hBaseClientService; +private TestRunner runner; + +@Before +public void setup() throws InitializationException { +proc = new ScanHBase(); +runner = TestRunners.newTestRunner(proc); + +hBaseClientService = new MockHBaseClientService(); +runner.addControllerService("hbaseClient", hBaseClientService); +runner.enableControllerService(hBaseClientService); +runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient"); +} + +@Test +public void testColumnsValidation() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,"); +runner.assertNotValid(); +} + +@Test +public void testNoIncomingFlowFile() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.run(); +runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); +runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); +runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + +Assert.assertEquals(0, hBaseClientService.getNumScans()); +} + +@Test +public void testInvalidTableName() { +runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); --- End diff -- Not setting a value for "hbase.table" is intentional. This test is for failure handling if expression is invalid (cannot be evaluated). You can see that FF expected at REL_FAILURE without scans. If I just didn't understand what you meant, please let me know. ---
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169624523 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { +//enhanced regex for columns to allow "-" in column qualifier names +static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*"); +
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169622730 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java --- @@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] startRow, final byte[] end } } +@Override +public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, +final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, +final Collection columns, final ResultHandler handler) throws IOException { + +try (final Table table = connection.getTable(TableName.valueOf(tableName)); +final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, +timerangeMax, limitRows, isReversed, columns)) { + +int cnt = 0; +final int lim = limitRows != null ? limitRows : 0; +for (final Result result : scanner) { + +if (lim > 0 && cnt++ > lim) break; + +final byte[] rowKey = result.getRow(); +final Cell[] cells = result.rawCells(); + +if (cells == null) { +continue; +} + +// convert HBase cells to NiFi cells +final ResultCell[] resultCells = new ResultCell[cells.length]; +for (int i = 0; i < cells.length; i++) { +final Cell cell = cells[i]; +final ResultCell resultCell = getResultCell(cell); +resultCells[i] = resultCell; +} + +// delegate to the handler +handler.handle(rowKey, resultCells); +} +} + +} + +// +protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, +final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { +final Scan scan = new Scan(); +if (!StringUtils.isBlank(startRow)) scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); +if (!StringUtils.isBlank(endRow)) scan.setStopRow( endRow.getBytes(StandardCharsets.UTF_8)); + + +Filter filter = null; +if (columns != null) { +for (Column col : columns) { +if (col.getQualifier() == null) { +scan.addFamily(col.getFamily()); +} else { +scan.addColumn(col.getFamily(), col.getQualifier()); +} +} +}else if (!StringUtils.isBlank(filterExpression)) { +ParseFilter parseFilter = new ParseFilter(); +filter = parseFilter.parseFilterString(filterExpression); +} +if (filter != null) scan.setFilter(filter); + +if (timerangeMin != null && timerangeMax != null) scan.setTimeRange(timerangeMin, timerangeMax); --- End diff -- Separate lines please. ---
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169623164 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) --- End diff -- Are you sure you don't want to do `INPUT_ALLOWED`? I can see good reasons not to do that. If you want to make it optional, see `ExecuteSQL` for some code you can steal to make it not do processing on flowfile input AND time interval. ---
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169622786 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java --- @@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] startRow, final byte[] end } } +@Override +public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, +final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, +final Collection columns, final ResultHandler handler) throws IOException { + +try (final Table table = connection.getTable(TableName.valueOf(tableName)); +final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, +timerangeMax, limitRows, isReversed, columns)) { + +int cnt = 0; +final int lim = limitRows != null ? limitRows : 0; +for (final Result result : scanner) { + +if (lim > 0 && cnt++ > lim) break; + +final byte[] rowKey = result.getRow(); +final Cell[] cells = result.rawCells(); + +if (cells == null) { +continue; +} + +// convert HBase cells to NiFi cells +final ResultCell[] resultCells = new ResultCell[cells.length]; +for (int i = 0; i < cells.length; i++) { +final Cell cell = cells[i]; +final ResultCell resultCell = getResultCell(cell); +resultCells[i] = resultCell; +} + +// delegate to the handler +handler.handle(rowKey, resultCells); +} +} + +} + +// +protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, +final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { +final Scan scan = new Scan(); +if (!StringUtils.isBlank(startRow)) scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); +if (!StringUtils.isBlank(endRow)) scan.setStopRow( endRow.getBytes(StandardCharsets.UTF_8)); + + +Filter filter = null; +if (columns != null) { +for (Column col : columns) { +if (col.getQualifier() == null) { +scan.addFamily(col.getFamily()); +} else { +scan.addColumn(col.getFamily(), col.getQualifier()); +} +} +}else if (!StringUtils.isBlank(filterExpression)) { +ParseFilter parseFilter = new ParseFilter(); +filter = parseFilter.parseFilterString(filterExpression); +} +if (filter != null) scan.setFilter(filter); --- End diff -- Separate lines with curly brackets. ---
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169622823 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java --- @@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] startRow, final byte[] end } } +@Override +public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, +final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, +final Collection columns, final ResultHandler handler) throws IOException { + +try (final Table table = connection.getTable(TableName.valueOf(tableName)); +final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, +timerangeMax, limitRows, isReversed, columns)) { + +int cnt = 0; +final int lim = limitRows != null ? limitRows : 0; +for (final Result result : scanner) { + +if (lim > 0 && cnt++ > lim) break; + +final byte[] rowKey = result.getRow(); +final Cell[] cells = result.rawCells(); + +if (cells == null) { +continue; +} + +// convert HBase cells to NiFi cells +final ResultCell[] resultCells = new ResultCell[cells.length]; +for (int i = 0; i < cells.length; i++) { +final Cell cell = cells[i]; +final ResultCell resultCell = getResultCell(cell); +resultCells[i] = resultCell; +} + +// delegate to the handler +handler.handle(rowKey, resultCells); +} +} + +} + +// +protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, +final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { +final Scan scan = new Scan(); +if (!StringUtils.isBlank(startRow)) scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); +if (!StringUtils.isBlank(endRow)) scan.setStopRow( endRow.getBytes(StandardCharsets.UTF_8)); + + +Filter filter = null; +if (columns != null) { +for (Column col : columns) { +if (col.getQualifier() == null) { +scan.addFamily(col.getFamily()); +} else { +scan.addColumn(col.getFamily(), col.getQualifier()); +} +} +}else if (!StringUtils.isBlank(filterExpression)) { +ParseFilter parseFilter = new ParseFilter(); +filter = parseFilter.parseFilterString(filterExpression); +} +if (filter != null) scan.setFilter(filter); + +if (timerangeMin != null && timerangeMax != null) scan.setTimeRange(timerangeMin, timerangeMax); + +// ->>> reserved for HBase v 2 or later +//if (limitRows != null && limitRows > 0){ +//scan.setLimit(limitRows) +//} + +if (isReversed != null) scan.setReversed(isReversed); --- End diff -- Same as above. ---
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169622591 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java --- @@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] startRow, final byte[] end } } +@Override +public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, +final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, +final Collection columns, final ResultHandler handler) throws IOException { + +try (final Table table = connection.getTable(TableName.valueOf(tableName)); +final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, +timerangeMax, limitRows, isReversed, columns)) { + +int cnt = 0; +final int lim = limitRows != null ? limitRows : 0; +for (final Result result : scanner) { + +if (lim > 0 && cnt++ > lim) break; + +final byte[] rowKey = result.getRow(); +final Cell[] cells = result.rawCells(); + +if (cells == null) { +continue; +} + +// convert HBase cells to NiFi cells +final ResultCell[] resultCells = new ResultCell[cells.length]; +for (int i = 0; i < cells.length; i++) { +final Cell cell = cells[i]; +final ResultCell resultCell = getResultCell(cell); +resultCells[i] = resultCell; +} + +// delegate to the handler +handler.handle(rowKey, resultCells); +} +} + +} + +// +protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, +final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { +final Scan scan = new Scan(); +if (!StringUtils.isBlank(startRow)) scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); +if (!StringUtils.isBlank(endRow)) scan.setStopRow( endRow.getBytes(StandardCharsets.UTF_8)); --- End diff -- See above. ---
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169622554 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java --- @@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] startRow, final byte[] end } } +@Override +public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, +final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, +final Collection columns, final ResultHandler handler) throws IOException { + +try (final Table table = connection.getTable(TableName.valueOf(tableName)); +final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, +timerangeMax, limitRows, isReversed, columns)) { + +int cnt = 0; +final int lim = limitRows != null ? limitRows : 0; +for (final Result result : scanner) { + +if (lim > 0 && cnt++ > lim) break; + +final byte[] rowKey = result.getRow(); +final Cell[] cells = result.rawCells(); + +if (cells == null) { +continue; +} + +// convert HBase cells to NiFi cells +final ResultCell[] resultCells = new ResultCell[cells.length]; +for (int i = 0; i < cells.length; i++) { +final Cell cell = cells[i]; +final ResultCell resultCell = getResultCell(cell); +resultCells[i] = resultCell; +} + +// delegate to the handler +handler.handle(rowKey, resultCells); +} +} + +} + +// +protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, +final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { +final Scan scan = new Scan(); +if (!StringUtils.isBlank(startRow)) scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); --- End diff -- This should be on separate lines like this: ``` if (!StringUtils.isBlank(startRow)) { scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); } ``` ---
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169622126 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java --- @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestScanHBase { + +private ScanHBase proc; +private MockHBaseClientService hBaseClientService; +private TestRunner runner; + +@Before +public void setup() throws InitializationException { +proc = new ScanHBase(); +runner = TestRunners.newTestRunner(proc); + +hBaseClientService = new MockHBaseClientService(); +runner.addControllerService("hbaseClient", hBaseClientService); +runner.enableControllerService(hBaseClientService); +runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient"); +} + +@Test +public void testColumnsValidation() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,"); +runner.assertNotValid(); +} + +@Test +public void testNoIncomingFlowFile() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.run(); +runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); +runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); +runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + +Assert.assertEquals(0, hBaseClientService.getNumScans()); +} + +@Test +public void testInvalidTableName() { +runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.enqueue("trigger flow file"); +runner.run(); + +runner.assertTransferCount(ScanHBase.REL_FAILURE, 1); +runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); +runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + +Assert.assertEquals(0, hBaseClientService.getNumScans()); +} + +@Test +public void testResultsNotFound() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.enqueue("trigger flow file"); +runner.run(); + +runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); +
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169621787 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java --- @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestScanHBase { + +private ScanHBase proc; +private MockHBaseClientService hBaseClientService; +private TestRunner runner; + +@Before +public void setup() throws InitializationException { +proc = new ScanHBase(); +runner = TestRunners.newTestRunner(proc); + +hBaseClientService = new MockHBaseClientService(); +runner.addControllerService("hbaseClient", hBaseClientService); +runner.enableControllerService(hBaseClientService); +runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient"); +} + +@Test +public void testColumnsValidation() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3"); +runner.assertValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3"); +runner.assertNotValid(); + +runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,"); +runner.assertNotValid(); +} + +@Test +public void testNoIncomingFlowFile() { +runner.setProperty(ScanHBase.TABLE_NAME, "table1"); +runner.setProperty(ScanHBase.START_ROW, "row1"); +runner.setProperty(ScanHBase.END_ROW, "row1"); + +runner.run(); +runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); +runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); +runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + +Assert.assertEquals(0, hBaseClientService.getNumScans()); +} + +@Test +public void testInvalidTableName() { +runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); --- End diff -- Without calling `setVariable` and `setValidateExpressionLanguage` (think that's its name) I'm not sure if this is going to do more than test the raw value in that field. Granted, that is an invalid name. ---
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169176478 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { +//enhanced regex for columns to allow "-" in column qualifier names +static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*"); +static
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169072654 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," ++ "by time range, by filter expression, or any combination of them. \n" ++ "Order of records can be controlled by a property Reversed" ++ "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ +@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), +@WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), +@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), +@WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), +@WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. " ++ "Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { +//enhanced regex for columns to allow "-" in column qualifier names +static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*"); +
[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor
GitHub user bdesert opened a pull request: https://github.com/apache/nifi/pull/2478 NIFI-4833 Add scanHBase Processor Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdesert/nifi NIFI-4833-Add-ScanHBase-processor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2478.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2478 commit 39bd6fb5d02eb7dca63830967823a4bb48c5712c Author: EdDate: 2018-02-17T21:26:04Z Add ScanHBase Processor New processor for scanning HBase records based on verious params like range of rowkeys, range of timestamps. Supports result limit and reverse scan. commit d2f5410be14a77f64e7ca5593e6c908620a8da58 Author: Ed Date: 2018-02-17T21:27:18Z Adds Atlas Support for ScanHBase processor Adds Atlas Support for ScanHBase processor ---