[ https://issues.apache.org/jira/browse/NIFI-1022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342816#comment-15342816 ]
ASF GitHub Bot commented on NIFI-1022: -------------------------------------- Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/379#discussion_r67958810 --- Diff: nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/GetAlluxio.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.alluxio; + +import alluxio.AlluxioURI; +import alluxio.client.ReadType; +import alluxio.client.file.FileInStream; +import alluxio.client.file.URIStatus; +import alluxio.client.file.options.OpenFileOptions; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.util.StopWatch; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@Tags({"alluxio", "tachyon", "get", "file"}) +@EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@CapabilityDescription("This processor will access the file using the input URI provided and write the content of " + + "the remote file to the content of the incoming FlowFile.") +public class GetAlluxio extends AbstractAlluxioProcessor { + + public static final PropertyDescriptor READ_TYPE = new PropertyDescriptor.Builder() + .name("alluxio-read-type") + .displayName("Read type") + .description("The Read Type to use when accessing the remote file") + .defaultValue(ReadType.CACHE_PROMOTE.toString()) + .required(true) + .allowableValues(ReadType.values()) + .build(); + + private final static List<PropertyDescriptor> propertyDescriptors; + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All files successfully retrieved are routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("In case of failure, flow files will be routed to this relationship") + .autoTerminateDefault(true) + .build(); + public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder() + .name("original") + .description("In case of success, the original FlowFile will be routed to this relationship") + .autoTerminateDefault(true) + .build(); + + private final static Set<Relationship> relationships; + + + /* + * Will ensure that the list of property descriptors is build only once. + * Will also create a Set of relationships + */ + static { + List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.addAll(descriptors); + _propertyDescriptors.add(READ_TYPE); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + _relationships.add(REL_SUCCESS_REQ); + relationships = Collections.unmodifiableSet(_relationships); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + + @OnScheduled + public void onScheduled(final ProcessContext context) { + createFileSystem(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile request = null; + if (context.hasIncomingConnection()) { + request = session.get(); + + // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. + // However, if we have no FlowFile and we have connections coming from other Processors, then + // we know that we should run only if we have a FlowFile. + if (request == null && context.hasNonLoopConnection()) { + return; + } + } + + final StopWatch stopWatch = new StopWatch(true); + final String uri = context.getProperty(URI).evaluateAttributeExpressions(request).getValue(); + final AlluxioURI path = new AlluxioURI(uri); + final OpenFileOptions options = OpenFileOptions.defaults().setReadType(ReadType.valueOf(context.getProperty(READ_TYPE).getValue())); + + FileInStream in = null; + FlowFile flowFile = null; + + if(request == null) { + flowFile = session.create(request); + } else { + flowFile = session.create(); + } + + try { + final URIStatus status = fileSystem.get().getStatus(path); + flowFile = updateFlowFile(status, flowFile, session); + + in = fileSystem.get().openFile(path, options); + final FileInStream toCopy = in; + + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + IOUtils.copy(toCopy, out); --- End diff -- I'll have to scope this out again. After the fact I realized that it may have just been the bulletin bug that was on master that was causing bulletins to work. When I next get the opportunity, I'll see if that's still the case. For some additional context, in the logs, which I now realize I failed to capture, there was mention of the Alluxio logger. I was curious if that was somehow interfering with NiFi's logger and preventing bulletins. I'll let you know if I can recreate. > Create GetTachyon and PutTachyon Processors > ------------------------------------------- > > Key: NIFI-1022 > URL: https://issues.apache.org/jira/browse/NIFI-1022 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Reporter: Jeremy Dyer > Assignee: Pierre Villard > Priority: Minor > Attachments: Alluxio.xml > > > Provide support for Apache Tachyon by implementing a GetTachyon and > PutTachyon processor. Having the ability to both read and write to Tachyon > would assist in sharing data with external applications such as Spark. -- This message was sent by Atlassian JIRA (v6.3.4#6332)