[ 
https://issues.apache.org/jira/browse/NIFI-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078590#comment-16078590
 ] 

ASF GitHub Bot commented on NIFI-4024:
--------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1961#discussion_r126214244
  
    --- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
 ---
    @@ -0,0 +1,309 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hbase.put.PutColumn;
    +import org.apache.nifi.hbase.put.PutFlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"hadoop", "hbase", "put", "record"})
    +@CapabilityDescription("Adds rows to HBase based on the contents of a 
flowfile using a configured record reader.")
    +public class PutHBaseRecord extends AbstractPutHBase {
    +
    +    protected static final PropertyDescriptor ROW_FIELD_NAME = new 
PropertyDescriptor.Builder()
    +            .name("Row Identifier Field Name")
    +            .description("Specifies the name of a record field whose value 
should be used as the row id for the given record.")
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    protected static final String FAIL_VALUE = "Fail";
    +    protected static final String WARN_VALUE = "Warn";
    +    protected static final String IGNORE_VALUE = "Ignore";
    +    protected static final String TEXT_VALUE = "Text";
    +
    +    protected static final AllowableValue COMPLEX_FIELD_FAIL = new 
AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any 
elements contain complex values.");
    +    protected static final AllowableValue COMPLEX_FIELD_WARN = new 
AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include 
field in row sent to HBase.");
    +    protected static final AllowableValue COMPLEX_FIELD_IGNORE = new 
AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include 
in row sent to HBase.");
    +    protected static final AllowableValue COMPLEX_FIELD_TEXT = new 
AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the 
complex field as the value of the given column.");
    +
    +    static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for 
parsing incoming data and determining the data's schema")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new 
PropertyDescriptor.Builder()
    +            .name("Complex Field Strategy")
    +            .description("Indicates how to handle complex fields, i.e. 
fields that do not have a single text value.")
    +            .expressionLanguageSupported(false)
    +            .required(true)
    +            .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, 
COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
    +            .defaultValue(COMPLEX_FIELD_TEXT.getValue())
    +            .build();
    +
    +
    +    protected static final AllowableValue FIELD_ENCODING_STRING = new 
AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
    +            "Stores the value of each field as a UTF-8 String.");
    +    protected static final AllowableValue FIELD_ENCODING_BYTES = new 
AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
    +            "Stores the value of each field as the byte representation of 
the type derived from the record.");
    +
    +    protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = 
new PropertyDescriptor.Builder()
    +            .name("Field Encoding Strategy")
    +            .description(("Indicates how to store the value of each field 
in HBase. The default behavior is to convert each value from the " +
    +                    "record to a String, and store the UTF-8 bytes. 
Choosing Bytes will interpret the type of each field from " +
    +                    "the record, and convert the value to the byte 
representation of that type, meaning an integer will be stored as the " +
    +                    "byte representation of that integer."))
    +            .required(true)
    +            .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
    +            .defaultValue(FIELD_ENCODING_STRING.getValue())
    +            .build();
    +
    +    protected static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .description("The maximum number of records to be sent to 
HBase at any one time from the record set.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(RECORD_READER_FACTORY);
    +        properties.add(HBASE_CLIENT_SERVICE);
    +        properties.add(TABLE_NAME);
    +        properties.add(ROW_ID);
    +        properties.add(ROW_FIELD_NAME);
    +        properties.add(ROW_ID_ENCODING_STRATEGY);
    +        properties.add(COLUMN_FAMILY);
    +        properties.add(BATCH_SIZE);
    +        properties.add(COMPLEX_FIELD_STRATEGY);
    +        properties.add(FIELD_ENCODING_STRATEGY);
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_SUCCESS);
    +        rels.add(REL_FAILURE);
    +        return rels;
    +    }
    +
    +    private int addBatch(String tableName, List<PutFlowFile> flowFiles) 
throws IOException {
    +        int columns = 0;
    +        clientService.put(tableName, flowFiles);
    +        for (PutFlowFile put : flowFiles) {
    +            columns += put.getColumns().size();
    +        }
    +
    +        return columns;
    +    }
    +
    +    private RecordReaderFactory recordParserFactory;
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final RecordReaderFactory recordParserFactory = 
context.getProperty(RECORD_READER_FACTORY)
    +                .asControllerService(RecordReaderFactory.class);
    +        List<PutFlowFile> flowFiles = new ArrayList<>();
    +        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +        final String rowFieldName = 
context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +        final String columnFamily = 
context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
    +        final String fieldEncodingStrategy = 
context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
    +        final long start = System.nanoTime();
    +        int index = 0;
    +        int columns = 0;
    +        boolean failed = false;
    +        String startIndexStr = flowFile.getAttribute("restart.index");
    +        int startIndex = -1;
    +        if (startIndexStr != null) {
    +            startIndex = Integer.parseInt(startIndexStr);
    +        }
    +
    +        PutFlowFile first = null;
    +        PutFlowFile last  = null;
    +        try (RecordReader reader = 
recordParserFactory.createRecordReader(flowFile, session.read(flowFile), 
getLogger())) {
    +            Record record;
    +            if (startIndex >= 0) {
    +                while ( index++ < startIndex && (reader.nextRecord()) != 
null) {}
    +            }
    +
    +            while ((record = reader.nextRecord()) != null) {
    +                PutFlowFile putFlowFile = createPut(context, record, 
reader.getSchema(), flowFile, rowFieldName, columnFamily, 
fieldEncodingStrategy);
    +                flowFiles.add(putFlowFile);
    +                if (index == 0) {
    +                    first = putFlowFile;
    +                }
    +                index++;
    +
    +                if (flowFiles.size() == batchSize) {
    +                    columns += addBatch(tableName, flowFiles);
    +                    last = flowFiles.get(flowFiles.size() - 1);
    +                    flowFiles = new ArrayList<>();
    +                }
    +            }
    +            if (flowFiles.size() > 0) {
    +                columns += addBatch(tableName, flowFiles);
    +                last = flowFiles.get(flowFiles.size() - 1);
    +            }
    +        } catch (Exception ex) {
    +            getLogger().error("Failed to put records to HBase.", ex);
    +            failed = true;
    +        }
    +        
    +        if (!failed) {
    +            long sendMillis = System.nanoTime() - start;
    +            List<String> urls = Arrays.asList(getTransitUri(first), 
getTransitUri(last));
    +            final String details = String.format("Put %d cells to HBase.", 
columns);
    +            session.getProvenanceReporter().send(flowFile, urls.get(0), 
details, sendMillis);
    +            session.getProvenanceReporter().send(flowFile, urls.get(1), 
details, sendMillis);
    +            session.transfer(flowFile, REL_SUCCESS);
    +        } else {
    +            String restartIndex = Integer.toString(index - 
flowFiles.size());
    +            flowFile = session.putAttribute(flowFile, "restart.index", 
restartIndex);
    +            flowFile = session.penalize(flowFile);
    +            session.transfer(flowFile, REL_FAILURE);
    +        }
    +
    +
    +
    +        session.commit();
    +    }
    +
    +
    +    @Override
    +    protected PutFlowFile createPut(ProcessSession session, ProcessContext 
context, FlowFile flowFile) {
    +        return null;
    +    }
    +
    +    protected byte[] asBytes(String field, Object input) {
    --- End diff --
    
    Looks like this method might not be used anymore and could be removed?


> Create EvaluateRecordPath processor
> -----------------------------------
>
>                 Key: NIFI-4024
>                 URL: https://issues.apache.org/jira/browse/NIFI-4024
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Steve Champagne
>            Priority: Minor
>
> With the new RecordPath DSL, it would be nice if there was a processor that 
> could pull fields into attributes of the flowfile based on a RecordPath. This 
> would be similar to the EvaluateJsonPath processor that currently exists, 
> except it could be used to pull fields from arbitrary record formats. My 
> current use case for it would be pulling fields out of Avro records while 
> skipping the steps of having to convert Avro to JSON, evaluate JsonPath, and 
> then converting back to Avro. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to