[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348551#comment-16348551 ]
ASF GitHub Bot commented on NIFI-4833: -------------------------------------- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2446#discussion_r165304629 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," + + "by time range, by filter expression, or any combination of them. \n" + + "Order of records can be controlled by a property <code>Reversed</code>" + + "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ + @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), + @WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), + @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), + @WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), + @WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. <br/>Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { + //enhanced regex for columns to allow "-" in column qualifier names + static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*"); + static final byte[] nl = System.lineSeparator().getBytes(); + + static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder() + .displayName("HBase Client Service") + .name("scanhbase-client-service") + .description("Specifies the Controller Service to use for accessing HBase.") + .required(true) + .identifiesControllerService(HBaseClientService.class) + .build(); + + static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .displayName("Table Name") + .name("scanhbase-table-name") + .description("The name of the HBase Table to fetch from.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor START_ROW = new PropertyDescriptor.Builder() + .displayName("Start rowkey") + .name("scanhbase-start-rowkey") + .description("The rowkey to start scan from.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor END_ROW = new PropertyDescriptor.Builder() + .displayName("End rowkey") + .name("scanhbase-end-rowkey") + .description("The row key to end scan by.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor TIME_RANGE_MIN = new PropertyDescriptor.Builder() + .displayName("Time range min") + .name("scanhbase-time-range-min") + .description("Time range min value. Both min and max values for time range should be either blank or provided.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.LONG_VALIDATOR) + .build(); + + static final PropertyDescriptor TIME_RANGE_MAX = new PropertyDescriptor.Builder() + .displayName("Time range max") + .name("scanhbase-time-range-max") + .description("Time range max value. Both min and max values for time range should be either blank or provided.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.LONG_VALIDATOR) + .build(); + + static final PropertyDescriptor LIMIT_ROWS = new PropertyDescriptor.Builder() + .displayName("Limit rows") + .name("scanhbase-limit") + .description("Limit number of rows retrieved by scan.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor BULK_SIZE = new PropertyDescriptor.Builder() + .displayName("Max rows per flow file") + .name("scanhbase-bulk-size") + .description("Limits number of rows in single flow file content. Set to 0 to avoid multiple flow files.") + .required(false) + .expressionLanguageSupported(true) + .defaultValue("0") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + + static final PropertyDescriptor REVERSED_SCAN = new PropertyDescriptor.Builder() + .displayName("Reversed order") + .name("scanhbase-reversed-order") + .description("Set whether this scan is a reversed one. This is false by default which means forward(normal) scan.") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .required(false) + .defaultValue("false") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) --- End diff -- Should be BOOLEAN_VALIDATOR > NIFI-4833 Add ScanHBase processor > --------------------------------- > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Reporter: Ed Berezitsky > Assignee: Ed Berezitsky > Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)