Github user bdesert commented on a diff in the pull request: https://github.com/apache/nifi/pull/2478#discussion_r169749383 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java --- @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," + + "by time range, by filter expression, or any combination of them. \n" + + "Order of records can be controlled by a property <code>Reversed</code>" + + "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ + @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), + @WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), + @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), + @WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), + @WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions. <br/>" + + "Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { + //enhanced regex for columns to allow "-" in column qualifier names + static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*"); + static final byte[] nl = System.lineSeparator().getBytes(); + + static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder() + .displayName("HBase Client Service") + .name("scanhbase-client-service") + .description("Specifies the Controller Service to use for accessing HBase.") + .required(true) + .identifiesControllerService(HBaseClientService.class) + .build(); + + static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .displayName("Table Name") + .name("scanhbase-table-name") + .description("The name of the HBase Table to fetch from.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor START_ROW = new PropertyDescriptor.Builder() + .displayName("Start rowkey") + .name("scanhbase-start-rowkey") + .description("The rowkey to start scan from.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor END_ROW = new PropertyDescriptor.Builder() + .displayName("End rowkey") + .name("scanhbase-end-rowkey") + .description("The row key to end scan by.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor TIME_RANGE_MIN = new PropertyDescriptor.Builder() + .displayName("Time range min") + .name("scanhbase-time-range-min") + .description("Time range min value. Both min and max values for time range should be either blank or provided.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.LONG_VALIDATOR) + .build(); + + static final PropertyDescriptor TIME_RANGE_MAX = new PropertyDescriptor.Builder() + .displayName("Time range max") + .name("scanhbase-time-range-max") + .description("Time range max value. Both min and max values for time range should be either blank or provided.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.LONG_VALIDATOR) + .build(); + + static final PropertyDescriptor LIMIT_ROWS = new PropertyDescriptor.Builder() + .displayName("Limit rows") + .name("scanhbase-limit") + .description("Limit number of rows retrieved by scan.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor BULK_SIZE = new PropertyDescriptor.Builder() + .displayName("Max rows per flow file") + .name("scanhbase-bulk-size") + .description("Limits number of rows in single flow file content. Set to 0 to avoid multiple flow files.") + .required(false) + .expressionLanguageSupported(true) + .defaultValue("0") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor REVERSED_SCAN = new PropertyDescriptor.Builder() + .displayName("Reversed order") + .name("scanhbase-reversed-order") + .description("Set whether this scan is a reversed one. This is false by default which means forward(normal) scan.") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .required(false) + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + static final PropertyDescriptor FILTER_EXPRESSION = new PropertyDescriptor.Builder() + .displayName("Filter expression") + .name("scanhbase-filter-expression") + .description("An HBase filter expression that will be applied to the scan. This property can not be used when also using the Columns property.<br/>" + + "Example: \"ValueFilter( =, 'binaryprefix:commit' )\"") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor COLUMNS = new PropertyDescriptor.Builder() + .displayName("Columns") + .name("scanhbase-columns") + .description("An optional comma-separated list of \"<colFamily>:<colQualifier>\" pairs to fetch. To return all columns " + + "for a given family, leave off the qualifier such as \"<colFamily1>,<colFamily2>\".") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN)) + .build(); + + static final AllowableValue JSON_FORMAT_FULL_ROW = new AllowableValue("full-row", "full-row", + "Creates a JSON document with the format: {\"row\":<row-id>, \"cells\":[{\"fam\":<col-fam>, \"qual\":<col-val>, \"val\":<value>, \"ts\":<timestamp>}]}."); + static final AllowableValue JSON_FORMAT_QUALIFIER_AND_VALUE = new AllowableValue("col-qual-and-val", "col-qual-and-val", + "Creates a JSON document with the format: {\"<col-qual>\":\"<value>\", \"<col-qual>\":\"<value>\"."); + + static final PropertyDescriptor JSON_FORMAT = new PropertyDescriptor.Builder() + .displayName("JSON Format") + .name("scanhbase-json-format") + .description("Specifies how to represent the HBase row as a JSON document.") + .required(true) + .allowableValues(JSON_FORMAT_FULL_ROW, JSON_FORMAT_QUALIFIER_AND_VALUE) + .defaultValue(JSON_FORMAT_FULL_ROW.getValue()) + .build(); + + static final PropertyDescriptor DECODE_CHARSET = new PropertyDescriptor.Builder() + .displayName("Decode Character Set") + .name("scanhbase-decode-charset") + .description("The character set used to decode data from HBase.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + static final PropertyDescriptor ENCODE_CHARSET = new PropertyDescriptor.Builder() + .displayName("Encode Character Set") + .name("scanhbase-encode-charset") + .description("The character set used to encode the JSON representation of the row.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original input file will be routed to this destination, even if no rows are retrieved based on provided conditions.") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All successful fetches are routed to this relationship.") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("All failed fetches are routed to this relationship.") + .build(); + + static final String HBASE_TABLE_ATTR = "hbase.table"; + static final String HBASE_ROWS_COUNT_ATTR = "hbase.rows.count"; + + static final List<PropertyDescriptor> properties; + static { + List<PropertyDescriptor> props = new ArrayList<>(); + props.add(HBASE_CLIENT_SERVICE); + props.add(TABLE_NAME); + props.add(START_ROW); + props.add(END_ROW); + props.add(TIME_RANGE_MIN); + props.add(TIME_RANGE_MAX); + props.add(LIMIT_ROWS); + props.add(REVERSED_SCAN); + props.add(BULK_SIZE); + props.add(FILTER_EXPRESSION); + props.add(COLUMNS); + props.add(JSON_FORMAT); + props.add(ENCODE_CHARSET); + props.add(DECODE_CHARSET); + properties = Collections.unmodifiableList(props); + } + + static final Set<Relationship> relationships; + static { + Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_ORIGINAL); + rels.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(rels); + } + + private volatile Charset decodeCharset; + private volatile Charset encodeCharset; + private RowSerializer serializer = null; + + @OnScheduled + public void onScheduled(ProcessContext context) { + this.decodeCharset = Charset.forName(context.getProperty(DECODE_CHARSET).getValue()); + this.encodeCharset = Charset.forName(context.getProperty(ENCODE_CHARSET).getValue()); + + final String jsonFormat = context.getProperty(JSON_FORMAT).getValue(); + if (jsonFormat.equals(JSON_FORMAT_FULL_ROW.getValue())) { + this.serializer = new JsonFullRowSerializer(decodeCharset, encodeCharset); + } else { + this.serializer = new JsonQualifierAndValueRowSerializer(decodeCharset, encodeCharset); + } + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + + final List<ValidationResult> problems = new ArrayList<>(); + + final String columns = validationContext.getProperty(COLUMNS).getValue(); + final String filter = validationContext.getProperty(FILTER_EXPRESSION).getValue(); + + if (!StringUtils.isBlank(columns) && !StringUtils.isBlank(filter)) { + problems.add(new ValidationResult.Builder() + .subject(FILTER_EXPRESSION.getDisplayName()) + .input(filter).valid(false) + .explanation("A filter expression can not be used in conjunction with the Columns property") + .build()); + } + + return problems; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + try{ + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isBlank(tableName)) { + getLogger().error("Table Name is blank or null for {}, transferring to failure", new Object[] {flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final String startRow = context.getProperty(START_ROW).evaluateAttributeExpressions(flowFile).getValue(); + final String endRow = context.getProperty(END_ROW).evaluateAttributeExpressions(flowFile).getValue(); + + final String filterExpression = context.getProperty(FILTER_EXPRESSION).evaluateAttributeExpressions(flowFile).getValue(); + + //evaluate and validate time range min and max values. They both should be either empty or provided. + Long timerangeMin = null; + Long timerangeMax = null; + + try{ + timerangeMin = context.getProperty(TIME_RANGE_MIN).evaluateAttributeExpressions(flowFile).asLong(); + }catch(Exception e){ + getLogger().error("Time range min value is not a number ({}) for {}, transferring to failure", + new Object[] {context.getProperty(TIME_RANGE_MIN).evaluateAttributeExpressions(flowFile).getValue(), flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + try{ + timerangeMax = context.getProperty(TIME_RANGE_MAX).evaluateAttributeExpressions(flowFile).asLong(); + }catch(Exception e){ + getLogger().error("Time range max value is not a number ({}) for {}, transferring to failure", + new Object[] {context.getProperty(TIME_RANGE_MAX).evaluateAttributeExpressions(flowFile).getValue(), flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + if (timerangeMin == null && timerangeMax != null) { + getLogger().error("Time range min value cannot be blank when max value provided for {}, transferring to failure", new Object[] {flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + }else if (timerangeMin != null && timerangeMax == null) { + getLogger().error("Time range max value cannot be blank when min value provided for {}, transferring to failure", new Object[] {flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final Integer limitRows = context.getProperty(LIMIT_ROWS).evaluateAttributeExpressions(flowFile).asInteger(); + + final Boolean isReversed = context.getProperty(REVERSED_SCAN).asBoolean(); + + final Integer bulkSize = context.getProperty(BULK_SIZE).evaluateAttributeExpressions(flowFile).asInteger(); + + final List<Column> columns = getColumns(context.getProperty(COLUMNS).evaluateAttributeExpressions(flowFile).getValue()); + final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); + + final AtomicReference<Long> rowsPulledHolder = new AtomicReference<>(0L); + final AtomicReference<Long> ffCountHolder = new AtomicReference<>(0L); + ScanHBaseResultHandler handler = new ScanHBaseResultHandler(context, session, flowFile, rowsPulledHolder, ffCountHolder, hBaseClientService, tableName, bulkSize); + + try { + hBaseClientService.scan(tableName, + startRow, endRow, + filterExpression, + timerangeMin, timerangeMax, + limitRows, + isReversed, + columns, + handler); + } catch (IOException e) { + getLogger().error("Unable to fetch rows from HBase table {} due to {}", new Object[] {tableName, e}); + flowFile = session.putAttribute(flowFile, "scanhbase.results.found", Boolean.toString(handler.isHandledAny())); + session.transfer(flowFile, REL_FAILURE); + return; + } + + LinkedList<Tuple<byte[], ResultCell[]>> hangingRows = handler.getHangingRows(); + if (!handler.isHandledAny() || // no rows found in hbase + (handler.isHandledAny() && (hangingRows == null || hangingRows.isEmpty())) // all the rows are flushed to FF inside handlers + ){ + flowFile = session.putAttribute(flowFile, "scanhbase.results.found", Boolean.toString(handler.isHandledAny())); + session.transfer(flowFile, REL_ORIGINAL); + session.commit(); + return; + } + + if (hangingRows != null && !hangingRows.isEmpty()) { + FlowFile lastFF = session.create(flowFile); + final Map<String, String> attributes = new HashMap<>(); + attributes.put(HBASE_TABLE_ATTR, tableName); + attributes.put(HBASE_ROWS_COUNT_ATTR, Long.toString(rowsPulledHolder.get())); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + attributes.put(HBASE_ROWS_COUNT_ATTR, Long.toString(hangingRows.size())); + lastFF = session.putAllAttributes(lastFF, attributes); + + final AtomicReference<IOException> ioe = new AtomicReference<>(null); + lastFF = session.write(lastFF, (out) -> { + for (Iterator<Tuple<byte[], ResultCell[]>> iter = hangingRows.iterator(); iter.hasNext();){ + Tuple<byte[], ResultCell[]> r = iter.next(); + serializer.serialize(r.getKey(), r.getValue(), out); + if (iter.hasNext()){ + out.write(nl); + } + } + }); + + Relationship rel = REL_SUCCESS; + IOException error = ioe.get(); + if (error != null){ + lastFF = session.putAttribute(lastFF, "scanhbase.error", error.toString()); + rel = REL_FAILURE; + } + session.transfer(lastFF, rel); + flowFile = session.putAttribute(flowFile, "scanhbase.results.found", Boolean.toString(handler.isHandledAny())); + session.transfer(flowFile, REL_ORIGINAL); + } + session.commit(); + + }catch (final Exception e) { + getLogger().error("Failed to receive data from HBase due to {}", e); + session.rollback(); + // if we failed, we want to yield so that we don't hammer hbase. + context.yield(); + } + } + + /** + * @param columnsValue a String in the form colFam:colQual,colFam:colQual + * @return a list of Columns based on parsing the given String + */ + private List<Column> getColumns(final String columnsValue) { + final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(",")); + + List<Column> columnsList = new ArrayList<>(columns.length); + + for (final String column : columns) { + if (column.contains(":")) { + final String[] parts = column.split(":"); + final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8); --- End diff -- A charset for data presentation is configurable. It's defined in onScheduled while creating serializer. A charset for hbase rowkey and column family/qualifier names is utf-8 intentionally to be consistent with GetHBase and FetchHBase. I would prefer to open new JIRA to handle charset used for row key and column family/qualifiers throughout the nifi-hbase package.
---