[ https://issues.apache.org/jira/browse/NIFI-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16107695#comment-16107695 ]
ASF GitHub Bot commented on NIFI-4024: -------------------------------------- Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1961#discussion_r130415687 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hadoop", "hbase", "put", "record"}) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +@ReadsAttribute(attribute = "restart.index", description = "Reads restart.index when it needs to replay part of a record set that did not get into HBase.") +@WritesAttribute(attribute = "restart.index", description = "Writes restart.index when a batch fails to be insert into HBase") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Path") + .description("Specifies the name of a record field whose value should be used as the row id for the given record.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final String FAIL_VALUE = "Fail"; + protected static final String WARN_VALUE = "Warn"; + protected static final String IGNORE_VALUE = "Ignore"; + protected static final String TEXT_VALUE = "Text"; + + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values."); + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column."); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("Complex Field Strategy") + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT) + .defaultValue(COMPLEX_FIELD_TEXT.getValue()) + .build(); + + + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of each field as a UTF-8 String."); + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE, + "Stores the value of each field as the byte representation of the type derived from the record."); + + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Field Encoding Strategy") + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " + + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " + + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " + + "byte representation of that integer.")) + .required(true) + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES) + .defaultValue(FIELD_ENCODING_STRING.getValue()) + .build(); + + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of records to be sent to HBase at any one time from the record set.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .build(); + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException { + int columns = 0; + clientService.put(tableName, flowFiles); + for (PutFlowFile put : flowFiles) { + columns += put.getColumns().size(); + } + + return columns; + } + + private RecordReaderFactory recordParserFactory; + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class); + List<PutFlowFile> flowFiles = new ArrayList<>(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); + final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue(); + final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue(); + + final long start = System.nanoTime(); + int index = 0; + int columns = 0; + boolean failed = false; + String startIndexStr = flowFile.getAttribute("restart.index"); + int startIndex = -1; + if (startIndexStr != null) { + startIndex = Integer.parseInt(startIndexStr); + } + + PutFlowFile last = null; + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) { + Record record; + if (startIndex >= 0) { + while ( index++ < startIndex && (reader.nextRecord()) != null) {} + } + + while ((record = reader.nextRecord()) != null) { + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy); + flowFiles.add(putFlowFile); + index++; + + if (flowFiles.size() == batchSize) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + } + } + if (flowFiles.size() > 0) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + } + } catch (Exception ex) { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + } + + if (!failed) { + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); + flowFile = session.removeAttribute(flowFile, "restart.index"); + session.transfer(flowFile, REL_SUCCESS); + } else { + String restartIndex = Integer.toString(index - flowFiles.size()); + flowFile = session.putAttribute(flowFile, "restart.index", restartIndex); + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); --- End diff -- Should we wrap this in a conditional like "if columns > 0" then send provenance? This would stop us from reporting a provenance event for cases where we didn't send anything successfully. > Create EvaluateRecordPath processor > ----------------------------------- > > Key: NIFI-4024 > URL: https://issues.apache.org/jira/browse/NIFI-4024 > Project: Apache NiFi > Issue Type: New Feature > Reporter: Steve Champagne > Priority: Minor > > With the new RecordPath DSL, it would be nice if there was a processor that > could pull fields into attributes of the flowfile based on a RecordPath. This > would be similar to the EvaluateJsonPath processor that currently exists, > except it could be used to pull fields from arbitrary record formats. My > current use case for it would be pulling fields out of Avro records while > skipping the steps of having to convert Avro to JSON, evaluate JsonPath, and > then converting back to Avro. -- This message was sent by Atlassian JIRA (v6.4.14#64029)