[ https://issues.apache.org/jira/browse/NIFI-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349079#comment-16349079 ]
ASF GitHub Bot commented on NIFI-4833: -------------------------------------- Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2446#discussion_r165448494 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," + + "by time range, by filter expression, or any combination of them. \n" + + "Order of records can be controlled by a property <code>Reversed</code>" + + "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ + @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), + @WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), + @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), + @WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), + @WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. <br/>Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { + //enhanced regex for columns to allow "-" in column qualifier names + static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*"); + static final byte[] nl = System.lineSeparator().getBytes(); + + static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder() + .displayName("HBase Client Service") + .name("scanhbase-client-service") + .description("Specifies the Controller Service to use for accessing HBase.") + .required(true) + .identifiesControllerService(HBaseClientService.class) + .build(); + + static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .displayName("Table Name") + .name("scanhbase-table-name") + .description("The name of the HBase Table to fetch from.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor START_ROW = new PropertyDescriptor.Builder() + .displayName("Start rowkey") + .name("scanhbase-start-rowkey") + .description("The rowkey to start scan from.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor END_ROW = new PropertyDescriptor.Builder() + .displayName("End rowkey") + .name("scanhbase-end-rowkey") + .description("The row key to end scan by.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor TIME_RANGE_MIN = new PropertyDescriptor.Builder() + .displayName("Time range min") + .name("scanhbase-time-range-min") + .description("Time range min value. Both min and max values for time range should be either blank or provided.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.LONG_VALIDATOR) + .build(); + + static final PropertyDescriptor TIME_RANGE_MAX = new PropertyDescriptor.Builder() + .displayName("Time range max") + .name("scanhbase-time-range-max") + .description("Time range max value. Both min and max values for time range should be either blank or provided.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.LONG_VALIDATOR) + .build(); + + static final PropertyDescriptor LIMIT_ROWS = new PropertyDescriptor.Builder() + .displayName("Limit rows") + .name("scanhbase-limit") + .description("Limit number of rows retrieved by scan.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor BULK_SIZE = new PropertyDescriptor.Builder() + .displayName("Max rows per flow file") + .name("scanhbase-bulk-size") + .description("Limits number of rows in single flow file content. Set to 0 to avoid multiple flow files.") + .required(false) + .expressionLanguageSupported(true) + .defaultValue("0") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + + static final PropertyDescriptor REVERSED_SCAN = new PropertyDescriptor.Builder() + .displayName("Reversed order") + .name("scanhbase-reversed-order") + .description("Set whether this scan is a reversed one. This is false by default which means forward(normal) scan.") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .required(false) + .defaultValue("false") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor FILTER_EXPRESSION = new PropertyDescriptor.Builder() + .displayName("Filter expression") + .name("scanhbase-filter-expression") + .description("An HBase filter expression that will be applied to the scan. This property can not be used when also using the Columns property.") --- End diff -- It's basically regular syntax of hbase shell filters. I don't parse them and just pass to HBase client as is. so if there is any change in future version to the filter i'll provide as an example, that example won't be valid, while processor itself will still be OK. That was my concern. If you still think it's would be better to have an example, I'll add it. Let me know. > NIFI-4833 Add ScanHBase processor > --------------------------------- > > Key: NIFI-4833 > URL: https://issues.apache.org/jira/browse/NIFI-4833 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Reporter: Ed Berezitsky > Assignee: Ed Berezitsky > Priority: Major > > Add ScanHBase (new) processor to retrieve records from HBase tables. > Today there are GetHBase and FetchHBaseRow. GetHBase can pull entire table or > only new rows after processor started; it also must be scheduled and doesn't > support incoming . FetchHBaseRow can pull rows with known rowkeys only. > This processor could provide functionality similar to what could be reached > by using hbase shell, defining following properties: > -scan based on range of row key IDs > -scan based on range of time stamps > -limit number of records pulled > -use filters > -reverse rows -- This message was sent by Atlassian JIRA (v7.6.3#76005)