http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java ---------------------------------------------------------------------- diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java index 0000000,31e5105..da80546 mode 000000,100644..100644 --- a/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java +++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java @@@ -1,0 -1,293 +1,293 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.stream.io.BufferedInputStream; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.Relationship; + import org.apache.nifi.processor.exception.FlowFileAccessException; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processors.standard.util.FileInfo; + import org.apache.nifi.processors.standard.util.FileTransfer; + import org.apache.nifi.processors.standard.util.SFTPTransfer; + import org.apache.nifi.util.ObjectHolder; + import org.apache.nifi.util.StopWatch; + + import java.io.File; + import java.io.IOException; + import java.io.InputStream; + import java.util.Collections; + import java.util.HashSet; + import java.util.Set; + import java.util.concurrent.TimeUnit; + + /** + * Base class for PutFTP & PutSFTP + * @param <T> + */ + public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that are successfully sent will be routed to success").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that failed to send to the remote system; failure is usually looped back to this processor").build(); + public static final Relationship REL_REJECT = new Relationship.Builder().name("reject").description("FlowFiles that were rejected by the destination system").build(); + + private final Set<Relationship> relationships; + + public PutFileTransfer() { + super(); + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_REJECT); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + protected abstract T getFileTransfer(final ProcessContext context); + + protected void beforePut(final FlowFile flowFile, final ProcessContext context, final T transfer) throws IOException { + + } + + protected void afterPut(final FlowFile flowFile, final ProcessContext context, final T transfer) throws IOException { + + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final String hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); + + final int maxNumberOfFiles = context.getProperty(FileTransfer.BATCH_SIZE).asInteger(); + int fileCount = 0; + try (final T transfer = getFileTransfer(context)) { + do { + final String rootPath = context.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions(flowFile).getValue(); + final String workingDirPath; + if (rootPath == null) { + workingDirPath = null; + } else { + File workingDirectory = new File(rootPath); + if (!workingDirectory.getPath().startsWith("/") && !workingDirectory.getPath().startsWith("\\")) { + workingDirectory = new File(transfer.getHomeDirectory(flowFile), workingDirectory.getPath()); + } + workingDirPath = workingDirectory.getPath().replace("\\", "/"); + } + + final boolean rejectZeroByteFiles = context.getProperty(FileTransfer.REJECT_ZERO_BYTE).asBoolean(); + final ConflictResult conflictResult = identifyAndResolveConflictFile(context.getProperty(FileTransfer.CONFLICT_RESOLUTION).getValue(), + transfer, workingDirPath, flowFile, rejectZeroByteFiles, logger); + + if (conflictResult.isTransfer()) { + final StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + + beforePut(flowFile, context, transfer); + final FlowFile flowFileToTransfer = flowFile; + final ObjectHolder<String> fullPathRef = new ObjectHolder<>(null); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try (final InputStream bufferedIn = new BufferedInputStream(in)) { + if (workingDirPath != null && context.getProperty(SFTPTransfer.CREATE_DIRECTORY).asBoolean()) { + transfer.ensureDirectoryExists(flowFileToTransfer, new File(workingDirPath)); + } + + fullPathRef.set(transfer.put(flowFileToTransfer, workingDirPath, conflictResult.getFileName(), bufferedIn)); + } + } + }); + afterPut(flowFile, context, transfer); + + stopWatch.stop(); + final String dataRate = stopWatch.calculateDataRate(flowFile.getSize()); + final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + logger.info("Successfully transfered {} to {} on remote host {} in {} milliseconds at a rate of {}", + new Object[]{flowFile, fullPathRef.get(), hostname, millis, dataRate}); + + String fullPathWithSlash = fullPathRef.get(); + if (!fullPathWithSlash.startsWith("/")) { + fullPathWithSlash = "/" + fullPathWithSlash; + } + final String destinationUri = transfer.getProtocolName() + "://" + hostname + fullPathWithSlash; + session.getProvenanceReporter().send(flowFile, destinationUri, millis); + } + + if (conflictResult.isPenalize()) { + flowFile = session.penalize(flowFile); + } + + session.transfer(flowFile, conflictResult.getRelationship()); + session.commit(); - } while (isScheduled() && (getRelationships().size() == session.getAvailableRelationships().size()) && (++fileCount < maxNumberOfFiles) && ((flowFile = session.get()) != null)); ++ } while (isScheduled() && (getRelationships().size() == context.getAvailableRelationships().size()) && (++fileCount < maxNumberOfFiles) && ((flowFile = session.get()) != null)); + } catch (final IOException e) { + context.yield(); + logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } catch (final FlowFileAccessException e) { + context.yield(); + logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e.getCause()}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } catch (final ProcessException e) { + context.yield(); + logger.error("Unable to transfer {} to remote host {} due to {}: {}; routing to failure", new Object[]{flowFile, hostname, e, e.getCause()}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } + + //Attempts to identify naming or content issues with files before they are transferred. + private ConflictResult identifyAndResolveConflictFile(final String conflictResolutionType, final T transfer, final String path, final FlowFile flowFile, final boolean rejectZeroByteFiles, final ProcessorLog logger) throws IOException { + Relationship destinationRelationship = REL_SUCCESS; + String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + boolean transferFile = true; + boolean penalizeFile = false; + + //First, check if the file is empty + //Reject files that are zero bytes or less + if (rejectZeroByteFiles) { + final long sizeInBytes = flowFile.getSize(); + if (sizeInBytes == 0) { + logger.warn("Rejecting {} because it is zero bytes", new Object[]{flowFile}); + return new ConflictResult(REL_REJECT, false, fileName, true); + } + } + + //Second, check if the user doesn't care about detecting naming conflicts ahead of time + if (conflictResolutionType.equalsIgnoreCase(FileTransfer.CONFLICT_RESOLUTION_NONE)) { + return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile); + } + + final FileInfo remoteFileInfo = transfer.getRemoteFileInfo(flowFile, path, fileName); + if (remoteFileInfo == null) { + return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile); + } + + if (remoteFileInfo.isDirectory()) { + logger.info("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile}); + return new ConflictResult(REL_REJECT, false, fileName, false); + } + + logger.info("Discovered a filename conflict on the remote server for {} so handling using configured Conflict Resolution of {}", + new Object[]{flowFile, conflictResolutionType}); + + switch (conflictResolutionType.toUpperCase()) { + case FileTransfer.CONFLICT_RESOLUTION_REJECT: + destinationRelationship = REL_REJECT; + transferFile = false; + penalizeFile = false; + logger.info("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile}); + break; + case FileTransfer.CONFLICT_RESOLUTION_REPLACE: + transfer.deleteFile(path, fileName); + destinationRelationship = REL_SUCCESS; + transferFile = true; + penalizeFile = false; + logger.info("Resolving filename conflict for {} with remote server by deleting remote file and replacing with flow file", new Object[]{flowFile}); + break; + case FileTransfer.CONFLICT_RESOLUTION_RENAME: + boolean uniqueNameGenerated = false; + for (int i = 1; i < 100 && !uniqueNameGenerated; i++) { + String possibleFileName = i + "." + fileName; + + final FileInfo renamedFileInfo = transfer.getRemoteFileInfo(flowFile, path, possibleFileName); + uniqueNameGenerated = (renamedFileInfo == null); + if (uniqueNameGenerated) { + fileName = possibleFileName; + logger.info("Attempting to resolve filename conflict for {} on the remote server by using a newly generated filename of: {}", new Object[]{flowFile, fileName}); + destinationRelationship = REL_SUCCESS; + transferFile = true; + penalizeFile = false; + break; + } + } + if (!uniqueNameGenerated) { + destinationRelationship = REL_REJECT; + transferFile = false; + penalizeFile = false; + logger.info("Could not determine a unique name after 99 attempts for. Switching resolution mode to REJECT for " + flowFile); + } + break; + case FileTransfer.CONFLICT_RESOLUTION_IGNORE: + destinationRelationship = REL_SUCCESS; + transferFile = false; + penalizeFile = false; + logger.info("Resolving conflict for {} by not transferring file and and still considering the process a success.", new Object[]{flowFile}); + break; + case FileTransfer.CONFLICT_RESOLUTION_FAIL: + destinationRelationship = REL_FAILURE; + transferFile = false; + penalizeFile = true; + logger.info("Resolved filename conflict for {} as configured by routing to FAILURE relationship.", new Object[]{flowFile}); + default: + break; + } + + return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile); + } + + /** + * static inner class to hold conflict data + */ + private static class ConflictResult { + + final Relationship relationship; + final boolean transferFile; + final String newFileName; + final boolean penalizeFile; + + public ConflictResult(final Relationship relationship, final boolean transferFileVal, final String newFileNameVal, final boolean penalizeFileVal) { + this.relationship = relationship; + this.transferFile = transferFileVal; + this.newFileName = newFileNameVal; + this.penalizeFile = penalizeFileVal; + } + + public boolean isTransfer() { + return transferFile; + } + + public boolean isPenalize() { + return penalizeFile; + } + + public String getFileName() { + return newFileName; + } + + public Relationship getRelationship() { + return relationship; + } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java ---------------------------------------------------------------------- diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java index 0000000,cae61f0..1cf5f1f mode 000000,100644..100644 --- a/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java +++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java @@@ -1,0 -1,320 +1,325 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard.servlets; + + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.security.cert.X509Certificate; + import java.util.Enumeration; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Map; + import java.util.Set; + import java.util.UUID; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicLong; + import java.util.concurrent.atomic.AtomicReference; + import java.util.regex.Pattern; + import java.util.zip.GZIPInputStream; + + import javax.servlet.ServletConfig; + import javax.servlet.ServletContext; + import javax.servlet.ServletException; + import javax.servlet.http.HttpServlet; + import javax.servlet.http.HttpServletRequest; + import javax.servlet.http.HttpServletResponse; + import javax.ws.rs.Path; + import javax.ws.rs.core.MediaType; + + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.stream.io.BufferedOutputStream; + import org.apache.nifi.stream.io.StreamThrottler; + import org.apache.nifi.logging.ProcessorLog; ++import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessSessionFactory; + import org.apache.nifi.processor.io.OutputStreamCallback; + import org.apache.nifi.processors.standard.ListenHTTP; + import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper; + import org.apache.nifi.util.FlowFileUnpackager; + import org.apache.nifi.util.FlowFileUnpackagerV1; + import org.apache.nifi.util.FlowFileUnpackagerV2; + import org.apache.nifi.util.FlowFileUnpackagerV3; + import org.apache.commons.io.IOUtils; + import org.apache.commons.lang3.StringUtils; + + @Path(ListenHTTP.URI) + public class ListenHTTPServlet extends HttpServlet { + + private static final long serialVersionUID = 5329940480987723163L; + + public static final String FLOWFILE_CONFIRMATION_HEADER = "x-prefer-acknowledge-uri"; + public static final String LOCATION_HEADER_NAME = "Location"; + public static final String DEFAULT_FOUND_SUBJECT = "none"; + public static final String APPLICATION_FLOW_FILE_V1 = "application/flowfile"; + public static final String APPLICATION_FLOW_FILE_V2 = "application/flowfile-v2"; + public static final String APPLICATION_FLOW_FILE_V3 = "application/flowfile-v3"; + public static final String LOCATION_URI_INTENT_NAME = "x-location-uri-intent"; + public static final String LOCATION_URI_INTENT_VALUE = "flowfile-hold"; + public static final int FILES_BEFORE_CHECKING_DESTINATION_SPACE = 5; + public static final String ACCEPT_HEADER_NAME = "Accept"; + public static final String ACCEPT_HEADER_VALUE = APPLICATION_FLOW_FILE_V3 + "," + APPLICATION_FLOW_FILE_V2 + "," + APPLICATION_FLOW_FILE_V1 + ",*/*;q=0.8"; + public static final String ACCEPT_ENCODING_NAME = "Accept-Encoding"; + public static final String ACCEPT_ENCODING_VALUE = "gzip"; + public static final String GZIPPED_HEADER = "flowfile-gzipped"; + public static final String PROTOCOL_VERSION_HEADER = "x-nifi-transfer-protocol-version"; + public static final String PROTOCOL_VERSION = "3"; + + private final AtomicLong filesReceived = new AtomicLong(0L); + private final AtomicBoolean spaceAvailable = new AtomicBoolean(true); + + private ProcessorLog logger; + private AtomicReference<ProcessSessionFactory> sessionFactoryHolder; ++ private volatile ProcessContext processContext; + private Pattern authorizedPattern; + private Pattern headerPattern; + private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap; + private StreamThrottler streamThrottler; + + /** + * + * @param config + * @throws ServletException + */ + @SuppressWarnings("unchecked") + @Override + public void init(final ServletConfig config) throws ServletException { + final ServletContext context = config.getServletContext(); + this.logger = (ProcessorLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER); + this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER); ++ this.processContext = (ProcessContext) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER); + this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN); + this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN); + this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); + this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER); + } + + @Override + protected void doHead(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { + response.addHeader(ACCEPT_ENCODING_NAME, ACCEPT_ENCODING_VALUE); + response.addHeader(ACCEPT_HEADER_NAME, ACCEPT_HEADER_VALUE); + response.addHeader(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION); + } + + @Override + protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { ++ final ProcessContext context = processContext; ++ + ProcessSessionFactory sessionFactory; + do { + sessionFactory = sessionFactoryHolder.get(); + if (sessionFactory == null) { + try { + Thread.sleep(10); + } catch (final InterruptedException e) { + } + } + } while (sessionFactory == null); + + final ProcessSession session = sessionFactory.createSession(); + FlowFile flowFile = null; + String holdUuid = null; + String foundSubject = null; + try { + final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE; + if (n == 0 || !spaceAvailable.get()) { - if (session.getAvailableRelationships().isEmpty()) { ++ if (context.getAvailableRelationships().isEmpty()) { + spaceAvailable.set(false); + if (logger.isDebugEnabled()) { + logger.debug("Received request from " + request.getRemoteHost() + " but no space available; Indicating Service Unavailable"); + } + response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + return; + } else { + spaceAvailable.set(true); + } + } + response.setHeader("Content-Type", MediaType.TEXT_PLAIN); + + final boolean contentGzipped = Boolean.parseBoolean(request.getHeader(GZIPPED_HEADER)); + + final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate"); + foundSubject = DEFAULT_FOUND_SUBJECT; + if (certs != null && certs.length > 0) { + for (final X509Certificate cert : certs) { + foundSubject = cert.getSubjectDN().getName(); + if (authorizedPattern.matcher(foundSubject).matches()) { + break; + } else { + logger.warn("Rejecting transfer attempt from " + foundSubject + " because the DN is not authorized, host=" + request.getRemoteHost()); + response.sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on dn"); + return; + } + } + } + + final String destinationVersion = request.getHeader(PROTOCOL_VERSION_HEADER); + Integer protocolVersion = null; + if (destinationVersion != null) { + try { + protocolVersion = Integer.valueOf(destinationVersion); + } catch (final NumberFormatException e) { + // Value was invalid. Treat as if the header were missing. + } + } + + final boolean destinationIsLegacyNiFi = (protocolVersion == null); + final boolean createHold = Boolean.parseBoolean(request.getHeader(FLOWFILE_CONFIRMATION_HEADER)); + final String contentType = request.getContentType(); + + final InputStream unthrottled = contentGzipped ? new GZIPInputStream(request.getInputStream()) : request.getInputStream(); + + final InputStream in = (streamThrottler == null) ? unthrottled : streamThrottler.newThrottledInputStream(unthrottled); + + if (logger.isDebugEnabled()) { + logger.debug("Received request from " + request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + contentType + ", gzip=" + contentGzipped); + } + + final AtomicBoolean hasMoreData = new AtomicBoolean(false); + final FlowFileUnpackager unpackager; + if (APPLICATION_FLOW_FILE_V3.equals(contentType)) { + unpackager = new FlowFileUnpackagerV3(); + } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) { + unpackager = new FlowFileUnpackagerV2(); + } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) { + unpackager = new FlowFileUnpackagerV1(); + } else { + unpackager = null; + } + + final Set<FlowFile> flowFileSet = new HashSet<>(); + + do { + final long startNanos = System.nanoTime(); + final Map<String, String> attributes = new HashMap<>(); + flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) { + if (unpackager == null) { + IOUtils.copy(in, bos); + hasMoreData.set(false); + } else { + attributes.putAll(unpackager.unpackageFlowFile(in, bos)); + + if (destinationIsLegacyNiFi) { + if (attributes.containsKey("nf.file.name")) { + // for backward compatibility with old nifi... + attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name")); + } + + if (attributes.containsKey("nf.file.path")) { + attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path")); + } + } + + // remove deprecated FlowFile attribute that was used in older versions of NiFi + attributes.remove("parent.uuid"); + + hasMoreData.set(unpackager.hasMoreData()); + } + } + } + }); + + final long transferNanos = System.nanoTime() - startNanos; + final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); + + // put metadata on flowfile + final String nameVal = request.getHeader(CoreAttributes.FILENAME.key()); + if (StringUtils.isNotBlank(nameVal)) { + attributes.put(CoreAttributes.FILENAME.key(), nameVal); + } + + // put arbitrary headers on flow file + for(Enumeration<String> headerEnum = request.getHeaderNames(); + headerEnum.hasMoreElements(); ) { + String headerName = headerEnum.nextElement(); + if (headerPattern != null && headerPattern.matcher(headerName).matches()) { + String headerValue = request.getHeader(headerName); + attributes.put(headerName, headerValue); + } + } + + String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key()); + if (sourceSystemFlowFileIdentifier != null) { + sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier; + + // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's + // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event + attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); + } + + flowFile = session.putAllAttributes(flowFile, attributes); + session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis); + flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject); + flowFileSet.add(flowFile); + + if (holdUuid == null) { + holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key()); + } + } while (hasMoreData.get()); + + if (createHold) { + String uuid = (holdUuid == null) ? UUID.randomUUID().toString() : holdUuid; + + if (flowFileMap.containsKey(uuid)) { + uuid = UUID.randomUUID().toString(); + } + + final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis()); + FlowFileEntryTimeWrapper previousWrapper; + do { + previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper); + if (previousWrapper != null) { + uuid = UUID.randomUUID().toString(); + } + } while (previousWrapper != null); + + response.setStatus(HttpServletResponse.SC_SEE_OTHER); + final String ackUri = ListenHTTP.URI + "/holds/" + uuid; + response.addHeader(LOCATION_HEADER_NAME, ackUri); + response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE); + response.getOutputStream().write(ackUri.getBytes("UTF-8")); + if (logger.isDebugEnabled()) { + logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}", + new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid}); + } + } else { + response.setStatus(HttpServletResponse.SC_OK); + logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success' {}", + new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFile}); + + session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS); + session.commit(); + } + } catch (final Throwable t) { + session.rollback(); + if (flowFile == null) { + logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t}); + } else { + logger.error("Unable to receive file {} from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{flowFile, request.getRemoteHost(), foundSubject, t}); + } + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString()); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java ---------------------------------------------------------------------- diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java index 0000000,a6402e4..ab4c978 mode 000000,100644..100644 --- a/nifi/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java +++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java @@@ -1,0 -1,138 +1,139 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import org.apache.nifi.processors.standard.DistributeLoad; + import org.apache.nifi.util.TestRunner; + import org.apache.nifi.util.TestRunners; + + import org.junit.Assert; + import org.junit.BeforeClass; + import org.junit.Test; + + public class TestDistributeLoad { + + @BeforeClass + public static void before() { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DistributeLoad", "debug"); + } + + @Test + public void testDefaultRoundRobin() { + final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad()); + testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100"); + + for (int i = 0; i < 101; i++) { + testRunner.enqueue(new byte[0]); + } + + testRunner.run(101); + testRunner.assertTransferCount("1", 2); + for (int i = 2; i <= 100; i++) { + testRunner.assertTransferCount(String.valueOf(i), 1); + } + } + + @Test + public void testWeightedRoundRobin() { + final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad()); + testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100"); + + testRunner.setProperty("1", "5"); + testRunner.setProperty("2", "3"); + + for (int i = 0; i < 106; i++) { + testRunner.enqueue(new byte[0]); + } + + testRunner.run(108); + testRunner.assertTransferCount("1", 5); + testRunner.assertTransferCount("2", 3); + for (int i = 3; i <= 100; i++) { + testRunner.assertTransferCount(String.valueOf(i), 1); + } + } + + @Test + public void testValidationOnAddedProperties() { + final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad()); + testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100"); + + testRunner.setProperty("1", "5"); + + try { + testRunner.setProperty("1", "0"); + Assert.fail("Allows property '1' to be set to '0'"); + } catch (final AssertionError e) { + // expected behavior + } + + try { + testRunner.setProperty("1", "-1"); + Assert.fail("Allows property '1' to be set to '-1'"); + } catch (final AssertionError e) { + // expected behavior + } + + testRunner.setProperty("1", "101"); + testRunner.setProperty("100", "5"); + + try { + testRunner.setProperty("101", "5"); + Assert.fail("Allows property '101' to be set to '5'"); + } catch (final AssertionError e) { + // expected behavior + } + + try { + testRunner.setProperty("0", "5"); + Assert.fail("Allows property '0' to be set to '5'"); + } catch (final AssertionError e) { + // expected behavior + } + + try { + testRunner.setProperty("-1", "5"); + Assert.fail("Allows property '-1' to be set to '5'"); + } catch (final AssertionError e) { + // expected behavior + } + } + + @Test + public void testNextAvailable() { + final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad()); + + testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS.getName(), "100"); + testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_NEXT_AVAILABLE); + + for (int i = 0; i < 99; i++) { + testRunner.enqueue(new byte[0]); + } + + testRunner.setRelationshipUnavailable("50"); + + testRunner.run(101); + testRunner.assertQueueEmpty(); + + for (int i = 1; i <= 100; i++) { ++ System.out.println(i); + testRunner.assertTransferCount(String.valueOf(i), (i == 50) ? 0 : 1); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java ---------------------------------------------------------------------- diff --cc nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java index 0000000,9e04439..7fa183f mode 000000,100644..100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java @@@ -1,0 -1,124 +1,132 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processor; + + import java.util.Map; ++import java.util.Set; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.PropertyValue; + import org.apache.nifi.controller.ControllerServiceLookup; + + /** + * <p> + * Provides a bridge between a Processor and the NiFi Framework + * </p> + * + * <p> + * <b>Note: </b>Implementations of this interface are NOT necessarily + * thread-safe. + * </p> + */ + public interface ProcessContext { + + /** + * Retrieves the current value set for the given descriptor, if a value is + * set - else uses the descriptor to determine the appropriate default value + * + * @param descriptor + * @return + */ + PropertyValue getProperty(PropertyDescriptor descriptor); + + /** + * Retrieves the current value set for the given descriptor, if a value is + * set - else uses the descriptor to determine the appropriate default value + * + * @param propertyName + * @return + */ + PropertyValue getProperty(String propertyName); + + /** + * Creates and returns a {@link PropertyValue} object that can be used for + * evaluating the value of the given String + * + * @param rawValue + * @return + */ + PropertyValue newPropertyValue(String rawValue); + + /** + * <p> + * Causes the Processor not to be scheduled for some pre-configured amount + * of time. The duration of time for which the processor will not be + * scheduled is configured in the same manner as the processor's scheduling + * period. + * </p> + * + * <p> + * <b>Note: </b>This is NOT a blocking call and does not suspend execution + * of the current thread. + * </p> + */ + void yield(); + + /** + * @return the maximum number of threads that may be executing this + * processor's code at any given time + */ + int getMaxConcurrentTasks(); + + /** + * @return the annotation data configured for this processor + */ + String getAnnotationData(); + + /** + * Returns a Map of all PropertyDescriptors to their configured values. This + * Map may or may not be modifiable, but modifying its values will not + * change the values of the processor's properties + * + * @return + */ + Map<PropertyDescriptor, String> getProperties(); + + /** + * Encrypts the given value using the password provided in the NiFi + * Properties + * + * @param unencrypted + * @return + */ + String encrypt(String unencrypted); + + /** + * Decrypts the given value using the password provided in the NiFi + * Properties + * + * @param encrypted + * @return + */ + String decrypt(String encrypted); + + /** + * Provides a {@code ControllerServiceLookup} that can be used to obtain a + * Controller Service + * + * @return + */ + ControllerServiceLookup getControllerServiceLookup(); ++ ++ /** ++ * @return the set of all relationships for which space is available to ++ * receive new objects ++ */ ++ Set<Relationship> getAvailableRelationships(); ++ + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java ---------------------------------------------------------------------- diff --cc nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java index 0000000,09d1bd2..d3de916 mode 000000,100644..100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java @@@ -1,0 -1,719 +1,713 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processor; + + import java.io.InputStream; + import java.io.OutputStream; + import java.nio.file.Path; + import java.util.Collection; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.regex.Pattern; + + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.processor.exception.FlowFileAccessException; + import org.apache.nifi.processor.exception.FlowFileHandlingException; + import org.apache.nifi.processor.exception.MissingFlowFileException; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.io.OutputStreamCallback; + import org.apache.nifi.processor.io.StreamCallback; + import org.apache.nifi.provenance.ProvenanceReporter; + + /** + * <p> + * A process session encompasses all the behaviors a processor can perform to + * obtain, clone, read, modify remove FlowFiles in an atomic unit. A process + * session is always tied to a single processor at any one time and ensures no + * FlowFile can ever be accessed by any more than one processor at a given time. + * The session also ensures that all FlowFiles are always accounted for. The + * creator of a ProcessSession is always required to manage the session.</p> + * + * <p> + * A session is not considered thread safe. The session supports a unit of work + * that is either committed or rolled back</p> + * + * <p> + * As noted on specific methods and for specific exceptions automated rollback + * will occur to ensure consistency of the repository. However, several + * situations can result in exceptions yet not cause automated rollback. In + * these cases the consistency of the repository will be retained but callers + * will be able to indicate whether it should result in rollback or continue on + * toward a commit.</p> + * + * <p> + * A process session instance may be used continuously. That is, after each + * commit or rollback, the session can be used again.</p> + * + * @author unattributed + */ + public interface ProcessSession { + + /** + * <p> + * Commits the current session ensuring all operations against FlowFiles + * within this session are atomically persisted. All FlowFiles operated on + * within this session must be accounted for by transfer or removal or the + * commit will fail.</p> + * + * <p> + * As soon as the commit completes the session is again ready to be used</p> + * + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session. + * @throws FlowFileHandlingException if not all FlowFiles acted upon within + * this session are accounted for by user code such that they have a + * transfer identified or where marked for removal. Automated rollback + * occurs. + * @throws ProcessException if some general fault occurs while persisting + * the session. Initiates automatic rollback. The root cause can be obtained + * via <code>Exception.getCause()</code> + */ + void commit(); + + /** + * Reverts any changes made during this session. All FlowFiles are restored + * back to their initial session state and back to their original queues. If + * this session is already committed or rolled back then no changes will + * occur. This method can be called any number of times. Calling this method + * is identical to calling {@link #rollback(boolean)} passing + * <code>false</code> as the parameter. + */ + void rollback(); + + /** + * Reverts any changes made during this session. All FlowFiles are restored + * back to their initial session state and back to their original queues, + * after optionally being penalized. If this session is already committed or + * rolled back then no changes will occur. This method can be called any + * number of times. + * + * @param penalize whether or not the FlowFiles that are being restored back + * to their queues should be penalized + */ + void rollback(boolean penalize); + + /** + * Adjusts counter data for the given counter name and takes care of + * registering the counter if not already present. The adjustment occurs + * only if and when the ProcessSession is committed. + * + * @param name the name of the counter + * @param delta the delta by which to modify the counter (+ or -) + * @param immediate if true, the counter will be updated immediately, + * without regard to whether the ProcessSession is commit or rolled back; + * otherwise, the counter will be incremented only if and when the + * ProcessSession is committed. + */ + void adjustCounter(String name, long delta, boolean immediate); + + /** + * @return FlowFile that is next highest priority FlowFile to process. + * Otherwise returns null. + */ + FlowFile get(); + + /** + * Returns up to <code>maxResults</code> FlowFiles from the work queue. If + * no FlowFiles are available, returns an empty list. Will not return null. + * If multiple incoming queues are present, the behavior is unspecified in + * terms of whether all queues or only a single queue will be polled in a + * single call. + * + * @param maxResults the maximum number of FlowFiles to return + * @return + * @throws IllegalArgumentException if <code>maxResults</code> is less than + * 0 + */ + List<FlowFile> get(int maxResults); + + /** + * <p> + * Returns all FlowFiles from all of the incoming queues for which the given + * {@link FlowFileFilter} indicates should be accepted. Calls to this method + * provide exclusive access to the underlying queues. I.e., no other thread + * will be permitted to pull FlowFiles from this Processor's queues or add + * FlowFiles to this Processor's incoming queues until this method call has + * returned. + * </p> + * + * @param filter + * @return + */ + List<FlowFile> get(FlowFileFilter filter); + + /** + * @return the QueueSize that represents the number of FlowFiles and their + * combined data size for all FlowFiles waiting to be processed by the + * Processor that owns this ProcessSession, regardless of which Connection + * the FlowFiles live on + */ + QueueSize getQueueSize(); + + /** - * @return the set of all relationships for which space is available to - * receive new objects - */ - Set<Relationship> getAvailableRelationships(); - - /** + * Creates a new FlowFile in the repository with no content and without any + * linkage to a parent FlowFile. This method is appropriate only when data + * is received or created from an external system. Otherwise, this method + * should be avoided and should instead use {@link #create(FlowFile)} or + * {@link #create(Collection<FlowFile>)}. + * + * When this method is used, a Provenance CREATE or RECEIVE Event should be + * generated. See the {@link #getProvenanceReporter()} method and + * {@link ProvenanceReporter} class for more information + * + * @return newly created FlowFile + */ + FlowFile create(); + + /** + * Creates a new FlowFile in the repository with no content but with a + * parent linkage to <code>parent</code>. The newly created FlowFile will + * inherit all of the parent's attributes except for the UUID. This method + * will automatically generate a Provenance FORK event or a Provenance JOIN + * event, depending on whether or not other FlowFiles are generated from the + * same parent before the ProcessSession is committed. + * + * @param parent + * @return + */ + FlowFile create(FlowFile parent); + + /** + * Creates a new FlowFile in the repository with no content but with a + * parent linkage to the FlowFiles specified by the parents Collection. The + * newly created FlowFile will inherit all of the attributes that are in + * common to all parents (except for the UUID, which will be in common if + * only a single parent exists). This method will automatically generate a + * Provenance JOIN event. + * + * @param parents + * @return + */ + FlowFile create(Collection<FlowFile> parents); + + /** + * Creates a new FlowFile that is a clone of the given FlowFile as of the + * time this is called, both in content and attributes. This method + * automatically emits a Provenance CLONE Event. + * + * @param example FlowFile to be the source of cloning - given FlowFile must + * be a part of the given session + * @return FlowFile that is a clone of the given example + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content + * @throws NullPointerException if the argument null + */ + FlowFile clone(FlowFile example); + + /** + * Creates a new FlowFile whose parent is the given FlowFile. The content of + * the new FlowFile will be a subset of the byte sequence of the given + * FlowFile starting at the specified offset and with the length specified. + * The new FlowFile will contain all of the attributes of the original. This + * method automatically emits a Provenance FORK Event (or a Provenance CLONE + * Event, if the offset is 0 and the size is exactly equal to the size of + * the example FlowFile). + * + * @param example + * @param offset + * @param size + * @return a FlowFile with the specified size whose parent is first argument + * to this function + * + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session, or if the + * specified offset + size exceeds that of the size of the example FlowFile. + * Automatic rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + */ + FlowFile clone(FlowFile example, long offset, long size); + + /** + * Sets a penalty for the given FlowFile which will make it unavailable to + * be operated on any further during the penalty period. + * + * @param flowFile to penalize + * @return FlowFile the new FlowFile reference to use + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws NullPointerException if the argument null + */ + FlowFile penalize(FlowFile flowFile); + + /** + * Updates the given FlowFiles attributes with the given key/value pair. If + * the key is named {@code uuid}, this attribute will be ignored. + * + * @param flowFile to update + * @param key of attribute + * @param value of attribute + * @return FlowFile the updated FlowFile + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws NullPointerException if an argument is null + */ + FlowFile putAttribute(FlowFile flowFile, String key, String value); + + /** + * Updates the given FlowFiles attributes with the given key/value pairs. If + * the map contains a key named {@code uuid}, this attribute will be + * ignored. + * + * @param flowFile to update + * @param attributes the attributes to add to the given FlowFile + * @return FlowFile the updated FlowFile + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws NullPointerException if an argument is null + */ + FlowFile putAllAttributes(FlowFile flowFile, Map<String, String> attributes); + + /** + * Removes the given FlowFile attribute with the given key. If the key is + * named {@code uuid}, this method will return the same FlowFile without + * removing any attribute. + * + * @param flowFile to update + * @param key of attribute + * @return FlowFile the updated FlowFile + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws NullPointerException if the argument null + */ + FlowFile removeAttribute(FlowFile flowFile, String key); + + /** + * Removes the attributes with the given keys from the given FlowFile. If + * the set of keys contains the value {@code uuid}, this key will be ignored + * + * @param flowFile to update + * @param keys of attribute + * @return FlowFile the updated FlowFile + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws NullPointerException if the argument null + */ + FlowFile removeAllAttributes(FlowFile flowFile, Set<String> keys); + + /** + * Remove all attributes from the given FlowFile that have keys which match + * the given pattern. If the pattern matches the key {@code uuid}, this key + * will not be removed. + * + * @param flowFile to update + * @param keyPattern may be null; if supplied is matched against each of the + * FlowFile attribute keys + * @return FlowFile containing only attributes which did not meet the key + * pattern + */ + FlowFile removeAllAttributes(FlowFile flowFile, Pattern keyPattern); + + /** + * Transfers the given FlowFile to the appropriate destination processor + * work queue(s) based on the given relationship. If the relationship leads + * to more than one destination the state of the FlowFile is replicated such + * that each destination receives an exact copy of the FlowFile though each + * will have its own unique identity. The destination processors will not be + * able to operate on the given FlowFile until this session is committed or + * until the ownership of the session is migrated to another processor. If + * ownership of the session is passed to a destination processor then that + * destination processor will have immediate visibility of the transferred + * FlowFiles within the session. + * + * @param flowFile + * @param relationship + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws NullPointerException if the argument null + * @throws IllegalArgumentException if given relationship is not a known or + * registered relationship + */ + void transfer(FlowFile flowFile, Relationship relationship); + + /** + * Transfers the given FlowFile back to the work queue from which it was + * pulled. The processor will not be able to operate on the given FlowFile + * until this session is committed. Any modifications that have been made to + * the FlowFile will be maintained. FlowFiles that are created by the + * processor cannot be transferred back to themselves via this method. + * + * @param flowFile + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws IllegalArgumentException if the FlowFile was created by this + * processor + * @throws NullPointerException if the argument null + */ + void transfer(FlowFile flowFile); + + /** + * Transfers the given FlowFiles back to the work queues from which the + * FlowFiles were pulled. The processor will not be able to operate on the + * given FlowFile until this session is committed. Any modifications that + * have been made to the FlowFile will be maintained. FlowFiles that are + * created by the processor cannot be transferred back to themselves via + * this method. + * + * @param flowFiles + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFiles are already + * transferred or removed or don't belong to this session. Automatic + * rollback will occur. + * @throws IllegalArgumentException if the FlowFile was created by this + * processor + * @throws NullPointerException if the argument null + */ + void transfer(Collection<FlowFile> flowFiles); + + /** + * Transfers the given FlowFile to the appropriate destination processor + * work queue(s) based on the given relationship. If the relationship leads + * to more than one destination the state of the FlowFile is replicated such + * that each destination receives an exact copy of the FlowFile though each + * will have its own unique identity. The destination processors will not be + * able to operate on the given FlowFile until this session is committed or + * until the ownership of the session is migrated to another processor. If + * ownership of the session is passed to a destination processor then that + * destination processor will have immediate visibility of the transferred + * FlowFiles within the session. + * + * @param flowFiles + * @param relationship + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws NullPointerException if the argument null + * @throws IllegalArgumentException if given relationship is not a known or + * registered relationship + */ + void transfer(Collection<FlowFile> flowFiles, Relationship relationship); + + /** + * Ends the managed persistence for the given FlowFile. The persistent + * attributes for the FlowFile are deleted and so is the content assuming + * nothing else references it and this FlowFile will no longer be available + * for further operation. + * + * @param flowFile + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + */ + void remove(FlowFile flowFile); + + /** + * Ends the managed persistence for the given FlowFiles. The persistent + * attributes for the FlowFile are deleted and so is the content assuming + * nothing else references it and this FlowFile will no longer be available + * for further operation. + * + * @param flowFiles + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if any of the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + */ + void remove(Collection<FlowFile> flowFiles); + + /** + * Executes the given callback against the contents corresponding to the + * given FlowFile. + * + * @param source + * @param reader + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content + */ + void read(FlowFile source, InputStreamCallback reader); + + /** + * Combines the content of all given source FlowFiles into a single given + * destination FlowFile. + * + * @param sources + * @param destination + * @return updated destination FlowFile (new size, etc...) + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws IllegalArgumentException if the given destination is contained + * within the sources + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content. The state of the destination will be as it was prior to + * this call. + */ + FlowFile merge(Collection<FlowFile> sources, FlowFile destination); + + /** + * Combines the content of all given source FlowFiles into a single given + * destination FlowFile. + * + * @param sources + * @param destination + * @param header bytes that will be added to the beginning of the merged + * output. May be null or empty. + * @param footer bytes that will be added to the end of the merged output. + * May be null or empty. + * @param demarcator bytes that will be placed in between each object merged + * together. May be null or empty. + * @return updated destination FlowFile (new size, etc...) + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws IllegalArgumentException if the given destination is contained + * within the sources + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content. The state of the destination will be as it was prior to + * this call. + */ + FlowFile merge(Collection<FlowFile> sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator); + + /** + * Executes the given callback against the content corresponding to the + * given FlowFile + * + * @param source + * @param writer + * @return updated FlowFile + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be referenced, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content + */ + FlowFile write(FlowFile source, OutputStreamCallback writer); + + /** + * Executes the given callback against the content corresponding to the + * given flow file + * + * @param source + * @param writer + * @return updated FlowFile + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content + */ + FlowFile write(FlowFile source, StreamCallback writer); + + /** + * Executes the given callback against the content corresponding to the + * given FlowFile, such that any data written to the OutputStream of the + * content will be appended to the end of FlowFile. + * + * @param source + * @param writer + * @return + */ + FlowFile append(FlowFile source, OutputStreamCallback writer); + + /** + * Writes to the given FlowFile all content from the given content path. + * + * @param source the file from which content will be obtained + * @param keepSourceFile if true the content is simply copied; if false the + * original content might be used in a destructive way for efficiency such + * that the repository will have the data but the original data will be + * gone. If false the source object will be removed or gone once imported. + * It will not be restored if the session is rolled back so this must be + * used with caution. In some cases it can result in tremendous efficiency + * gains but is also dangerous. + * @param destination the FlowFile whose content will be updated + * @return the updated destination FlowFile (new size) + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content + */ + FlowFile importFrom(Path source, boolean keepSourceFile, FlowFile destination); + + /** + * Writes to the given FlowFile all content from the given content path. + * + * @param source the file from which content will be obtained + * @param destination the FlowFile whose content will be updated + * @return the updated destination FlowFile (new size) + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content + */ + FlowFile importFrom(InputStream source, FlowFile destination); + + /** + * Writes the content of the given FlowFile to the given destination path. + * + * @param flowFile + * @param destination + * @param append if true will append to the current content at the given + * path; if false will replace any current content + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content + */ + void exportTo(FlowFile flowFile, Path destination, boolean append); + + /** + * Writes the content of the given FlowFile to the given destination stream + * + * @param flowFile + * @param destination + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content + */ + void exportTo(FlowFile flowFile, OutputStream destination); + + /** + * Returns a ProvenanceReporter that is tied to this ProcessSession. + * + * @return + */ + ProvenanceReporter getProvenanceReporter(); + }