Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114154289
  
    --- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
 ---
    @@ -0,0 +1,279 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hadoop;
    +
    +import org.apache.commons.io.input.NullInputStream;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.security.AccessControlException;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.configuration.DefaultSettings;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.FlowFileAccessException;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.net.URI;
    +import java.security.PrivilegedAction;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Base processor for reading a data from HDFS that can be fetched into 
records.
    + */
    +@TriggerWhenEmpty // trigger when empty so we have a chance to perform a 
Kerberos re-login
    +@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield 
since we are triggering when empty
    +public abstract class AbstractFetchHDFSRecord extends 
AbstractHadoopProcessor {
    +
    +    public static final PropertyDescriptor FILENAME = new 
PropertyDescriptor.Builder()
    +            .name("filename")
    +            .displayName("Filename")
    +            .description("The name of the file to retrieve")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("${path}/${filename}")
    +            
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
    +            .name("record-writer")
    +            .displayName("Record Writer")
    +            .description("The service for writing records to the FlowFile 
content")
    +            .identifiesControllerService(RecordSetWriterFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("FlowFiles will be routed to this relationship 
once they have been updated with the content of the file")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles will be routed to this relationship if 
the content of the file cannot be retrieved and trying again will likely not be 
helpful. "
    +                    + "This would occur, for instance, if the file is not 
found or if there is a permissions issue")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("FlowFiles will be routed to this relationship if 
the content of the file cannot be retrieved, but might be able to be in the 
future if tried again. "
    +                    + "This generally indicates that the Fetch should be 
tried again.")
    +            .build();
    +
    +    public static final String FETCH_FAILURE_REASON_ATTR = 
"fetch.failure.reason";
    +    public static final String RECORD_COUNT_ATTR = "record.count";
    +
    +    private volatile Set<Relationship> fetchHdfsRecordRelationships;
    +    private volatile List<PropertyDescriptor> fetchHdfsRecordProperties;
    +
    +    @Override
    +    protected final void init(final ProcessorInitializationContext 
context) {
    +        super.init(context);
    +
    +        final Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_SUCCESS);
    +        rels.add(REL_RETRY);
    +        rels.add(REL_FAILURE);
    +        this.fetchHdfsRecordRelationships = 
Collections.unmodifiableSet(rels);
    +
    +        final List<PropertyDescriptor> props = new ArrayList<>(properties);
    +        props.add(FILENAME);
    +        props.add(RECORD_WRITER);
    +        props.addAll(getAdditionalProperties());
    +        this.fetchHdfsRecordProperties = 
Collections.unmodifiableList(props);
    +    }
    +
    +    /**
    +     * Allows sub-classes to add additional properties, called from 
initialize.
    +     *
    +     * @return additional properties to add to the overall list
    +     */
    +    public List<PropertyDescriptor> getAdditionalProperties() {
    +        return Collections.emptyList();
    +    }
    +
    +    @Override
    +    public final Set<Relationship> getRelationships() {
    +        return fetchHdfsRecordRelationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        return fetchHdfsRecordProperties;
    +    }
    +
    +    /**
    +     * Sub-classes provide the appropriate HDFSRecordReader.
    +     *
    +     * @param context the process context to obtain additional 
configuration
    +     * @param flowFile the flow file being written
    +     * @param conf the Configuration instance
    +     * @param path the path to write to
    +     * @return the HDFSRecordWriter
    +     * @throws IOException if an error occurs creating the writer
    +     */
    +    public abstract HDFSRecordReader createHDFSRecordReader(final 
ProcessContext context, final FlowFile flowFile, final Configuration conf, 
final Path path)
    +            throws IOException;
    +
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        // do this before getting a flow file so that we always get a 
chance to attempt Kerberos relogin
    +        final FileSystem fileSystem = getFileSystem();
    +        final Configuration configuration = getConfiguration();
    +        final UserGroupInformation ugi = getUserGroupInformation();
    +
    +        if (configuration == null || fileSystem == null || ugi == null) {
    +            getLogger().error("Processor not configured properly because 
Configuration, FileSystem, or UserGroupInformation was null");
    +            context.yield();
    +            return;
    +        }
    +
    +        final FlowFile originalFlowFile = session.get();
    +        if (originalFlowFile == null ) {
    +            context.yield();
    +            return;
    +        }
    +
    +
    +        ugi.doAs((PrivilegedAction<Object>)() -> {
    +            FlowFile child = null;
    +            final String filenameValue = 
context.getProperty(FILENAME).evaluateAttributeExpressions(originalFlowFile).getValue();
    +            try {
    +                final Path path = new Path(filenameValue);
    +                final AtomicReference<Throwable> exceptionHolder = new 
AtomicReference<>(null);
    +                final AtomicReference<WriteResult> writeResult = new 
AtomicReference<>();
    +
    +                final RecordSetWriterFactory recordSetWriterFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
    +                final RecordSetWriter recordSetWriter = 
recordSetWriterFactory.createWriter(getLogger(), originalFlowFile, new 
NullInputStream(0));
    +
    +                final StopWatch stopWatch = new StopWatch(true);
    +
    +                // use a child FlowFile so that if any error occurs we can 
route the original untouched FlowFile to retry/failure
    +                child = session.create(originalFlowFile);
    +                child = session.write(child, (final OutputStream rawOut) 
-> {
    +                    try (final BufferedOutputStream out = new 
BufferedOutputStream(rawOut);
    +                         final HDFSRecordReader recordReader = 
createHDFSRecordReader(context, originalFlowFile, configuration, path)) {
    +
    +                        final RecordSchema emptySchema = new 
SimpleRecordSchema(Collections.emptyList());
    +
    +                        final RecordSet recordSet = new RecordSet() {
    +                            @Override
    +                            public RecordSchema getSchema() throws 
IOException {
    +                                return emptySchema;
    +                            }
    +
    +                            @Override
    +                            public Record next() throws IOException {
    +                                return recordReader.nextRecord();
    +                            }
    +                        };
    +
    +                        writeResult.set(recordSetWriter.write(recordSet, 
out));
    +                    } catch (Exception e) {
    +                        exceptionHolder.set(e);
    +                    }
    +                });
    +
    +                stopWatch.stop();
    +
    +                // if any errors happened within the session.write then 
throw the exception so we jump
    +                // into one of the appropriate catch blocks below
    +                if (exceptionHolder.get() != null) {
    +                    throw exceptionHolder.get();
    +                }
    +
    +                FlowFile successFlowFile = postProcess(context, session, 
child, path);
    +
    +                final Map<String,String> attributes = new 
HashMap<>(writeResult.get().getAttributes());
    +                attributes.put(RECORD_COUNT_ATTR, 
String.valueOf(writeResult.get().getRecordCount()));
    +                attributes.put(CoreAttributes.MIME_TYPE.key(), 
recordSetWriter.getMimeType());
    +                successFlowFile = 
session.putAllAttributes(successFlowFile, attributes);
    +
    +                final URI uri = path.toUri();
    +                getLogger().info("Successfully received content from {} 
for {} in {}", new Object[] {uri, successFlowFile, stopWatch.getDuration()});
    --- End diff --
    
    Add unit of `milliseconds` to the log output of the duration. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to