Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114157766
  
    --- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
 ---
    @@ -0,0 +1,505 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hadoop;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.permission.FsPermission;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.configuration.DefaultSettings;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.FlowFileAccessException;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.hadoop.exception.FailureException;
    +import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
    +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
    +import org.apache.nifi.schema.access.SchemaAccessStrategy;
    +import org.apache.nifi.schema.access.SchemaAccessUtils;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.BufferedInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.security.PrivilegedAction;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
    +import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
    +import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
    +import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
    +import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
    +import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
    +
    +/**
    + * Base class for processors that write Records to HDFS.
    + */
    +@TriggerWhenEmpty // trigger when empty so we have a chance to perform a 
Kerberos re-login
    +@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield 
since we are triggering when empty
    +public abstract class AbstractPutHDFSRecord extends 
AbstractHadoopProcessor {
    +
    +
    +    public static final PropertyDescriptor COMPRESSION_TYPE = new 
PropertyDescriptor.Builder()
    +            .name("compression-type")
    +            .displayName("Compression Type")
    +            .description("The type of compression for the file being 
written.")
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor OVERWRITE = new 
PropertyDescriptor.Builder()
    +            .name("overwrite")
    +            .displayName("Overwrite Files")
    +            .description("Whether or not to overwrite existing files in 
the same directory with the same name. When set to false, " +
    +                    "flow files will be routed to failure when a file 
exists in the same directory with the same name.")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor UMASK = new 
PropertyDescriptor.Builder()
    +            .name("permissions-umask")
    +            .displayName("Permissions umask")
    +            .description("A umask represented as an octal number which 
determines the permissions of files written to HDFS. " +
    +                    "This overrides the Hadoop Configuration 
dfs.umaskmode")
    +            .addValidator(HadoopValidators.UMASK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor REMOTE_OWNER = new 
PropertyDescriptor.Builder()
    +            .name("remote-owner")
    +            .displayName("Remote Owner")
    +            .description("Changes the owner of the HDFS file to this value 
after it is written. " +
    +                    "This only works if NiFi is running as a user that has 
HDFS super user privilege to change owner")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor REMOTE_GROUP = new 
PropertyDescriptor.Builder()
    +            .name("remote-group")
    +            .displayName("Remote Group")
    +            .description("Changes the group of the HDFS file to this value 
after it is written. " +
    +                    "This only works if NiFi is running as a user that has 
HDFS super user privilege to change group")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("The service for reading records from incoming 
flow files.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("Flow Files that have been successfully processed 
are transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("Flow Files that could not be processed due to 
issues that can be retried are transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("Flow Files that could not be processed due to 
issue that cannot be retried are transferred to this relationship")
    +            .build();
    +
    +    public static final String RECORD_COUNT_ATTR = "record.count";
    +
    +    private volatile String remoteOwner;
    +    private volatile String remoteGroup;
    +    private volatile SchemaAccessStrategy schemaAccessStrategy;
    +
    +    private volatile Set<Relationship> putHdfsRecordRelationships;
    +    private volatile List<PropertyDescriptor> putHdfsRecordProperties;
    +
    +    private final List<AllowableValue> strategyList = 
Collections.unmodifiableList(Arrays.asList(
    +            SCHEMA_NAME_PROPERTY,
    +            SCHEMA_TEXT_PROPERTY,
    +            HWX_SCHEMA_REF_ATTRIBUTES,
    +            HWX_CONTENT_ENCODED_SCHEMA
    +    ));
    +
    +
    +    @Override
    +    protected final void init(final ProcessorInitializationContext 
context) {
    +        super.init(context);
    +
    +        final Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_SUCCESS);
    +        rels.add(REL_RETRY);
    +        rels.add(REL_FAILURE);
    +        this.putHdfsRecordRelationships = 
Collections.unmodifiableSet(rels);
    +
    +        final List<PropertyDescriptor> props = new ArrayList<>(properties);
    +        props.add(RECORD_READER);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(DIRECTORY)
    +                .description("The parent directory to which files should 
be written. Will be created if it doesn't exist.")
    +                .build());
    +
    +        final AllowableValue[] strategies = 
getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
    +                .description("Specifies how to obtain the schema that is 
to be used for writing the data.")
    +                .allowableValues(strategies)
    +                .defaultValue(getDefaultSchemaAccessStrategy().getValue())
    +                .build());
    +
    +        props.add(SCHEMA_REGISTRY);
    +        props.add(SCHEMA_NAME);
    +        props.add(SCHEMA_TEXT);
    +
    +        final AllowableValue[] compressionTypes = 
getCompressionTypes(context).toArray(new AllowableValue[0]);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(COMPRESSION_TYPE)
    +                .allowableValues(compressionTypes)
    +                .defaultValue(getDefaultCompressionType(context))
    +                .build());
    +
    +        props.add(OVERWRITE);
    +        props.add(UMASK);
    +        props.add(REMOTE_GROUP);
    +        props.add(REMOTE_OWNER);
    +        props.addAll(getAdditionalProperties());
    +        this.putHdfsRecordProperties = Collections.unmodifiableList(props);
    +    }
    +
    +    protected List<AllowableValue> getSchemaAccessStrategyValues() {
    +        return strategyList;
    +    }
    +
    +    protected AllowableValue getDefaultSchemaAccessStrategy() {
    +        return SCHEMA_NAME_PROPERTY;
    +    }
    +
    +    private PropertyDescriptor getSchemaAcessStrategyDescriptor() {
    +        return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
    +    }
    +
    +    /**
    +     * @param context the initialization context
    +     * @return the possible compression types
    +     */
    +    public abstract List<AllowableValue> getCompressionTypes(final 
ProcessorInitializationContext context);
    +
    +    /**
    +     * @param context the initialization context
    +     * @return the default compression type
    +     */
    +    public abstract String getDefaultCompressionType(final 
ProcessorInitializationContext context);
    +
    +    /**
    +     * Allows sub-classes to add additional properties, called from 
initialize.
    +     *
    +     * @return additional properties to add to the overall list
    +     */
    +    public List<PropertyDescriptor> getAdditionalProperties() {
    +        return Collections.emptyList();
    +    }
    +
    +    @Override
    +    public final Set<Relationship> getRelationships() {
    +        return putHdfsRecordRelationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +       return putHdfsRecordProperties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
    +        final String schemaAccessStrategy = 
validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
    +        return 
SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, 
schemaAccessStrategy, getSchemaAccessStrategyValues());
    +    }
    +
    +    @OnScheduled
    +    public final void onScheduled(final ProcessContext context) throws 
IOException {
    +        super.abstractOnScheduled(context);
    +
    +        final SchemaRegistry schemaRegistry = 
context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
    +
    +        final PropertyDescriptor descriptor = 
getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
    +        final String schemaAccess = 
context.getProperty(descriptor).getValue();
    +        this.schemaAccessStrategy = 
SchemaAccessUtils.getSchemaAccessStrategy(schemaAccess, schemaRegistry, 
context);
    +
    +        this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue();
    +        this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue();
    +
    +        // Set umask once, to avoid thread safety issues doing it in 
onTrigger
    +        final PropertyValue umaskProp = context.getProperty(UMASK);
    +        final short dfsUmask;
    +        if (umaskProp.isSet()) {
    +            dfsUmask = Short.parseShort(umaskProp.getValue(), 8);
    +        } else {
    +            dfsUmask = FsPermission.DEFAULT_UMASK;
    +        }
    +        final Configuration conf = getConfiguration();
    +        FsPermission.setUMask(conf, new FsPermission(dfsUmask));
    +    }
    +
    +    /**
    +     * Sub-classes provide the appropriate HDFSRecordWriter.
    +     *
    +     * @param context the process context to obtain additional 
configuration
    +     * @param flowFile the flow file being written
    +     * @param conf the Configuration instance
    +     * @param path the path to write to
    +     * @param schema the schema for writing
    +     * @return the HDFSRecordWriter
    +     * @throws IOException if an error occurs creating the writer or 
processing the schema
    +     */
    +    public abstract HDFSRecordWriter createHDFSRecordWriter(
    +            final ProcessContext context,
    +            final FlowFile flowFile,
    +            final Configuration conf,
    +            final Path path,
    +            final RecordSchema schema) throws IOException, 
SchemaNotFoundException;
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        // do this before getting a flow file so that we always get a 
chance to attempt Kerberos relogin
    +        final FileSystem fileSystem = getFileSystem();
    +        final Configuration configuration = getConfiguration();
    +        final UserGroupInformation ugi = getUserGroupInformation();
    +
    +        if (configuration == null || fileSystem == null || ugi == null) {
    +            getLogger().error("Processor not configured properly because 
Configuration, FileSystem, or UserGroupInformation was null");
    +            context.yield();
    +            return;
    +        }
    +
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            context.yield();
    +            return;
    +        }
    +
    +        ugi.doAs((PrivilegedAction<Object>)() -> {
    +            Path tempDotCopyFile = null;
    +            FlowFile putFlowFile = flowFile;
    +            try {
    +                final String filenameValue = 
putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); // TODO codec extension
    +                final String directoryValue = 
context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
    +
    +                // create the directory if it doesn't exist
    +                final Path directoryPath = new Path(directoryValue);
    +                createDirectory(fileSystem, directoryPath, remoteOwner, 
remoteGroup);
    +
    +                // write to tempFile first and on success rename to 
destFile
    +                final Path tempFile = new Path(directoryPath, "." + 
filenameValue);
    +                final Path destFile = new Path(directoryPath, 
filenameValue);
    +
    +                final boolean destinationExists = 
fileSystem.exists(destFile) || fileSystem.exists(tempFile);
    +                final boolean shouldOverwrite = 
context.getProperty(OVERWRITE).asBoolean();
    +
    +                // if the tempFile or destFile already exist, and 
overwrite is set to false, then transfer to failure
    +                if (destinationExists && !shouldOverwrite) {
    +                    session.transfer(session.penalize(putFlowFile), 
REL_FAILURE);
    +                    getLogger().warn("penalizing {} and routing to failure 
because file with same name already exists", new Object[]{putFlowFile});
    +                    return null;
    +                }
    +
    +                final AtomicReference<Throwable> exceptionHolder = new 
AtomicReference<>(null);
    +                final AtomicReference<WriteResult> writeResult = new 
AtomicReference<>();
    +                final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +
    +                final FlowFile flowFileIn = putFlowFile;
    +                final StopWatch stopWatch = new StopWatch(true);
    +
    +                // Read records from the incoming FlowFile and write them 
the tempFile
    +                session.read(putFlowFile, (final InputStream rawIn) -> {
    +                    RecordReader recordReader = null;
    +                    HDFSRecordWriter recordWriter = null;
    +
    +                    try (final BufferedInputStream in = new 
BufferedInputStream(rawIn)) {
    +                        final RecordSchema destRecordSchema = 
schemaAccessStrategy.getSchema(flowFile, in);
    +                        recordWriter = createHDFSRecordWriter(context, 
flowFile, configuration, tempFile, destRecordSchema);
    +
    +                        // if we fail to create the RecordReader then we 
want to route to failure, so we need to
    +                        // handle this separately from the other 
IOExceptions which normally rout to retry
    +                        try {
    +                            recordReader = 
recordReaderFactory.createRecordReader(flowFileIn, in, getLogger());
    +                        } catch (Exception e) {
    +                            final RecordReaderFactoryException rrfe = new 
RecordReaderFactoryException("Unable to create RecordReader", e);
    +                            exceptionHolder.set(rrfe);
    +                            return;
    +                        }
    +
    +                        final RecordSet recordSet = 
recordReader.createRecordSet();
    +                        writeResult.set(recordWriter.write(recordSet));
    +
    +                    } catch (Exception e) {
    +                        exceptionHolder.set(e);
    +                    } finally {
    +                        IOUtils.closeQuietly(recordReader);
    +                        IOUtils.closeQuietly(recordWriter);
    +                    }
    +                });
    +                stopWatch.stop();
    +
    +                final String dataRate = 
stopWatch.calculateDataRate(putFlowFile.getSize());
    +                final long millis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
    +                tempDotCopyFile = tempFile;
    +
    +                // if any errors happened within the session.read then 
throw the exception so we jump
    +                // into one of the appropriate catch blocks below
    +                if (exceptionHolder.get() != null) {
    +                    throw exceptionHolder.get();
    +                }
    +
    +                // Attempt to rename from the tempFile to destFile, and 
change owner if successfully renamed
    +                rename(fileSystem, tempFile, destFile);
    +                changeOwner(fileSystem, destFile, remoteOwner, 
remoteGroup);
    +
    +                getLogger().info("Wrote {} to {} in {} milliseconds at a 
rate of {}", new Object[]{putFlowFile, destFile, millis, dataRate});
    +
    +                putFlowFile = postProcess(context, session, putFlowFile, 
destFile);
    +
    +                final String outputPath = destFile.toString();
    +                final String newFilename = destFile.getName();
    +                final String hdfsPath = destFile.getParent().toString();
    +
    +                // Update the filename and absolute path attributes
    +                final Map<String,String> attributes = new 
HashMap<>(writeResult.get().getAttributes());
    +                attributes.put(CoreAttributes.FILENAME.key(), newFilename);
    +                attributes.put(ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
    +                attributes.put(RECORD_COUNT_ATTR, 
String.valueOf(writeResult.get().getRecordCount()));
    +                putFlowFile = session.putAllAttributes(putFlowFile, 
attributes);
    +
    +                // Send a provenance event and transfer to success
    +                final String transitUri = (outputPath.startsWith("/")) ? 
"hdfs:/" + outputPath : "hdfs://" + outputPath;
    +                session.getProvenanceReporter().send(putFlowFile, 
transitUri);
    +                session.transfer(putFlowFile, REL_SUCCESS);
    +
    +            } catch (IOException | FlowFileAccessException e) {
    +                deleteQuietly(fileSystem, tempDotCopyFile);
    +                getLogger().error("Failed to write due to {}", new 
Object[]{e});
    +                session.transfer(session.penalize(putFlowFile), REL_RETRY);
    +                context.yield();
    +            } catch (Throwable t) {
    +                deleteQuietly(fileSystem, tempDotCopyFile);
    +                getLogger().error("Failed to write due to {}", new 
Object[]{t});
    +                session.transfer(putFlowFile, REL_FAILURE);
    +            }
    +
    +            return null;
    +        });
    +    }
    +
    +    /**
    +     * This method will be called after successfully writing to the 
destination file and renaming the file to it's final name
    +     * in order to give sub-classes a chance to take action before 
transferring to success.
    +     *
    +     * @param context the context
    +     * @param session the session
    +     * @param flowFile the flow file being processed
    +     * @param destFile the destination file written to
    +     * @return an updated FlowFile reference
    +     */
    +    protected FlowFile postProcess(final ProcessContext context, final 
ProcessSession session, final FlowFile flowFile, final Path destFile) {
    +        return flowFile;
    +    }
    +
    +    protected void rename(final FileSystem fileSystem, final Path srcFile, 
final Path destFile) throws IOException, InterruptedException, FailureException 
{
    +        boolean renamed = false;
    +        for (int i = 0; i < 10; i++) { // try to rename multiple times.
    +            if (fileSystem.rename(srcFile, destFile)) {
    +                renamed = true;
    +                break;// rename was successful
    +            }
    +            Thread.sleep(200L);// try waiting to let whatever might cause 
rename failure to resolve
    +        }
    +        if (!renamed) {
    --- End diff --
    
    My read of this is that if the rename operation fails 10x, the source file 
is deleted. Is that captured anywhere in the docs/Javadocs, etc.? Would be a 
little confusing for a user unless the only context for this method is renaming 
the temporary file to the persistent one. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to