[ 
https://issues.apache.org/jira/browse/NIFI-1022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338378#comment-15338378
 ] 

ASF GitHub Bot commented on NIFI-1022:
--------------------------------------

Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/379#discussion_r67610439
  
    --- Diff: 
nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/GetAlluxio.java
 ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.alluxio;
    +
    +import alluxio.AlluxioURI;
    +import alluxio.client.ReadType;
    +import alluxio.client.file.FileInStream;
    +import alluxio.client.file.URIStatus;
    +import alluxio.client.file.options.OpenFileOptions;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"alluxio", "tachyon", "get", "file"})
    +@EventDriven
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@CapabilityDescription("This processor will access the file using the 
input URI provided and write the content of "
    +        + "the remote file to the content of the incoming FlowFile.")
    +public class GetAlluxio extends AbstractAlluxioProcessor {
    +
    +    public static final PropertyDescriptor READ_TYPE = new 
PropertyDescriptor.Builder()
    +            .name("alluxio-read-type")
    +            .displayName("Read type")
    +            .description("The Read Type to use when accessing the remote 
file")
    +            .defaultValue(ReadType.CACHE_PROMOTE.toString())
    +            .required(true)
    +            .allowableValues(ReadType.values())
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("All files successfully retrieved are routed to 
this relationship")
    +            .build();
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("In case of failure, flow files will be routed to 
this relationship")
    +            .autoTerminateDefault(true)
    +            .build();
    +    public static final Relationship REL_SUCCESS_REQ = new 
Relationship.Builder()
    +            .name("original")
    +            .description("In case of success, the original FlowFile will 
be routed to this relationship")
    +            .autoTerminateDefault(true)
    +            .build();
    +
    +    private final static Set<Relationship> relationships;
    +
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only 
once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.addAll(descriptors);
    +        _propertyDescriptors.add(READ_TYPE);
    +        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_SUCCESS_REQ);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        createFileSystem(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        FlowFile request = null;
    +        if (context.hasIncomingConnection()) {
    +            request = session.get();
    +
    +            // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
    +            // However, if we have no FlowFile and we have connections 
coming from other Processors, then
    +            // we know that we should run only if we have a FlowFile.
    +            if (request == null && context.hasNonLoopConnection()) {
    +                return;
    +            }
    +        }
    +
    +        final StopWatch stopWatch = new StopWatch(true);
    +        final String uri = 
context.getProperty(URI).evaluateAttributeExpressions(request).getValue();
    +        final AlluxioURI path = new AlluxioURI(uri);
    +        final OpenFileOptions options = 
OpenFileOptions.defaults().setReadType(ReadType.valueOf(context.getProperty(READ_TYPE).getValue()));
    +
    +        FileInStream in = null;
    +        FlowFile flowFile = null;
    +
    +        if(request == null) {
    +            flowFile = session.create(request);
    +        } else {
    +            flowFile = session.create();
    +        }
    +
    +        try {
    +            final URIStatus status = fileSystem.get().getStatus(path);
    +            flowFile = updateFlowFile(status, flowFile, session);
    +
    +            in = fileSystem.get().openFile(path, options);
    +            final FileInStream toCopy = in;
    +
    +            flowFile = session.write(flowFile, new OutputStreamCallback() {
    +                @Override
    +                public void process(final OutputStream out) throws 
IOException {
    +                    IOUtils.copy(toCopy, out);
    --- End diff --
    
    This is throwing errors in some cases and not bubbling out to the wrapping 
catch so there is no indication of issues.  One particular instance was:
    
    > 016-06-19 01:12:52,512 ERROR [Timer-Driven Process Thread-8] 
o.a.nifi.processors.alluxio.PutAlluxio 
PutAlluxio[id=37744f8c-57fe-48c9-ac0b-e3fd255efc4b] An error occurred while 
writing file /nifi-630c823e-7492-4539-bf14-632dfa4b85e2; transferring to 
'failure': java.lang.RuntimeException: No available Alluxio worker found
    2016-06-19 01:12:52,513 ERROR [Timer-Driven Process Thread-8] 
o.a.nifi.processors.alluxio.PutAlluxio
    java.lang.RuntimeException: No available Alluxio worker found
        at 
alluxio.client.block.AlluxioBlockStore.getOutStream(AlluxioBlockStore.java:174) 
~[alluxio-core-client-1.0.1.jar:na]
        at 
alluxio.client.file.FileOutStream.getNextBlock(FileOutStream.java:266) 
~[alluxio-core-client-1.0.1.jar:na]
        at alluxio.client.file.FileOutStream.write(FileOutStream.java:231) 
~[alluxio-core-client-1.0.1.jar:na]
        at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1793) 
~[commons-io-2.4.jar:2.4]
        at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1769) 
~[commons-io-2.4.jar:2.4]
        at org.apache.commons.io.IOUtils.copy(IOUtils.java:1744) 
~[commons-io-2.4.jar:2.4]


> Create GetTachyon and PutTachyon Processors
> -------------------------------------------
>
>                 Key: NIFI-1022
>                 URL: https://issues.apache.org/jira/browse/NIFI-1022
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Jeremy Dyer
>            Assignee: Pierre Villard
>            Priority: Minor
>         Attachments: Alluxio.xml
>
>
> Provide support for Apache Tachyon by implementing a GetTachyon and 
> PutTachyon processor. Having the ability to both read and write to Tachyon 
> would assist in sharing data with external applications such as Spark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to