Github user bdesert commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2446#discussion_r165448494
  
    --- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 ---
    @@ -0,0 +1,564 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.hbase.io.JsonFullRowSerializer;
    +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
    +import org.apache.nifi.hbase.io.RowSerializer;
    +import org.apache.nifi.hbase.scan.Column;
    +import org.apache.nifi.hbase.scan.ResultCell;
    +import org.apache.nifi.hbase.scan.ResultHandler;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.Tuple;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.regex.Pattern;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"hbase", "scan", "fetch", "get"})
    +@CapabilityDescription("Scans and fetches rows from an HBase table. This 
processor may be used to fetch rows from hbase table by specifying a range of 
rowkey values (start and/or end ),"
    +        + "by time range, by filter expression, or any combination of 
them. \n"
    +        + "Order of records can be controlled by a property 
<code>Reversed</code>"
    +        + "Number of rows retrieved by the processor can be limited.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hbase.table", description = "The 
name of the HBase table that the row was fetched from"),
    +        @WritesAttribute(attribute = "hbase.resultset", description = "A 
JSON document/s representing the row/s. This property is only written when a 
Destination of flowfile-attributes is selected."),
    +        @WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise"),
    +        @WritesAttribute(attribute = "hbase.rows.count", description = 
"Number of rows in the content of given flow file"),
    +        @WritesAttribute(attribute = "scanhbase.results.found", 
description = "Indicates whether at least one row has been found in given hbase 
table with provided conditions. <br/>Could be null (not present) if transfered 
to FAILURE")
    +})
    +public class ScanHBase extends AbstractProcessor {
    +   //enhanced regex for columns to allow "-" in column qualifier names
    +    static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
    +    static final byte[] nl = System.lineSeparator().getBytes();
    +
    +    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
    +            .displayName("HBase Client Service")
    +            .name("scanhbase-client-service")
    +            .description("Specifies the Controller Service to use for 
accessing HBase.")
    +            .required(true)
    +            .identifiesControllerService(HBaseClientService.class)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
    +            .displayName("Table Name")
    +            .name("scanhbase-table-name")
    +            .description("The name of the HBase Table to fetch from.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor START_ROW = new 
PropertyDescriptor.Builder()
    +            .displayName("Start rowkey")
    +            .name("scanhbase-start-rowkey")
    +            .description("The rowkey to start scan from.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor END_ROW = new 
PropertyDescriptor.Builder()
    +            .displayName("End rowkey")
    +            .name("scanhbase-end-rowkey")
    +            .description("The row key to end scan by.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MIN = new 
PropertyDescriptor.Builder()
    +            .displayName("Time range min")
    +            .name("scanhbase-time-range-min")
    +            .description("Time range min value. Both min and max values 
for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor TIME_RANGE_MAX = new 
PropertyDescriptor.Builder()
    +            .displayName("Time range max")
    +            .name("scanhbase-time-range-max")
    +            .description("Time range max value. Both min and max values 
for time range should be either blank or provided.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.LONG_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor LIMIT_ROWS = new 
PropertyDescriptor.Builder()
    +            .displayName("Limit rows")
    +            .name("scanhbase-limit")
    +            .description("Limit number of rows retrieved by scan.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor BULK_SIZE = new 
PropertyDescriptor.Builder()
    +            .displayName("Max rows per flow file")
    +            .name("scanhbase-bulk-size")
    +            .description("Limits number of rows in single flow file 
content. Set to 0 to avoid multiple flow files.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +    
    +    
    +    static final PropertyDescriptor REVERSED_SCAN = new 
PropertyDescriptor.Builder()
    +            .displayName("Reversed order")
    +            .name("scanhbase-reversed-order")
    +            .description("Set whether this scan is a reversed one. This is 
false by default which means forward(normal) scan.")
    +            .expressionLanguageSupported(false)
    +            .allowableValues("true", "false")
    +            .required(false)
    +            .defaultValue("false")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    
    +    static final PropertyDescriptor FILTER_EXPRESSION = new 
PropertyDescriptor.Builder()
    +            .displayName("Filter expression")
    +            .name("scanhbase-filter-expression")
    +            .description("An HBase filter expression that will be applied 
to the scan. This property can not be used when also using the Columns 
property.")
    --- End diff --
    
    It's basically regular syntax of hbase shell filters. I don't parse them 
and just pass to HBase client as is. so if there is any change in future 
version to the filter i'll provide as an example, that example won't be valid, 
while processor itself will still be OK. That was my concern. If you still 
think it's would be better to have an example, I'll add it. Let me know.


---

Reply via email to