Repository: incubator-nifi Updated Branches: refs/heads/NIFI-190 [created] 6061475ed
NIFI-190: Initial commit of HoldFile processor Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8177ae4f Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8177ae4f Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8177ae4f Branch: refs/heads/NIFI-190 Commit: 8177ae4f1f8a6a3de23685f8068def23fe2b6b01 Parents: ac3c3bb Author: gresockj <jgres...@gmail.com> Authored: Fri Dec 19 22:46:42 2014 -0500 Committer: gresockj <jgres...@gmail.com> Committed: Sat Dec 27 05:48:52 2014 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/HoldFile.java | 283 +++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../index.html | 158 +++++++++++ .../nifi/processors/standard/TestHoldFile.java | 155 ++++++++++ 4 files changed, 597 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8177ae4f/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java new file mode 100644 index 0000000..aa06560 --- /dev/null +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java @@ -0,0 +1,283 @@ +package org.apache.nifi.processors.standard; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.commons.lang.StringUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.annotation.CapabilityDescription; +import org.apache.nifi.processor.annotation.OnScheduled; +import org.apache.nifi.processor.annotation.Tags; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +@Tags({"hold", "release", "signal"}) +@CapabilityDescription("Holds incoming flow files until a matching signal flow file enters the processor. " + + "Incoming files are classified as either held files or signals. " + + "Held files are routed to the Held relationship until a matching signal has been received.") +public class HoldFile extends AbstractProcessor { + public static final String FLOW_FILE_RELEASE_VALUE = "flow.file.release.value"; + public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); + + public static final PropertyDescriptor MAX_SIGNAL_AGE = new PropertyDescriptor + .Builder().name("Max Signal Age") + .description("The maximum age of a signal that will trigger a file to be released. " + + "Expected format is <duration> <time unit> where <duration> is a positive " + + "integer and <time unit> is one of seconds, minutes, hours") + .required(true).defaultValue("24 hours") + .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) + .build(); + public static final PropertyDescriptor RELEASE_SIGNAL_ATTRIBUTE = new PropertyDescriptor + .Builder().name("Release Signal Attribute") + .description("The flow file attribute name on held files that will be checked against values in the signal cache.") + .required(true). + addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) + .build(); + public static final PropertyDescriptor COPY_SIGNAL_ATTRIBUTES = new PropertyDescriptor + .Builder().name("Copy Signal Attributes?") + .description("If true, a signal's flow file attributes will be copied to its matching held files, " + + "with the exception of flow.file.release.value and the configured Signal Failure Attribute") + .required(true) + .defaultValue("true") + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor FAILURE_ATTRIBUTE = new PropertyDescriptor + .Builder().name("Signal Failure Attribute") + .description("Signals that have this attribute set to 'true' " + + "will cause matching held flow files to route to Failure. If this attribute " + + "is not populated, it is assumed that the flow file succeeds.") + .required(false) + .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) + .build(); + + public static final Relationship REL_HOLD = new Relationship.Builder() + .name("hold").description("Held files whose signals have not been received are routed here").build(); + + public static final Relationship REL_RELEASE = new Relationship.Builder() + .name("release").description("Held files whose signals have been received are routed here").build(); + + public static final Relationship REL_EXPIRED = new Relationship.Builder() + .name("expired").description("Held files that expire are routed here").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure").description("Held files whose signal contains the Signal Failure Attribute are " + + "routed here, indicating a processing failure upstream").build(); + + private Set<String> excludedAttributes = new HashSet<>(); + + private Set<Relationship> relationships; + private List<PropertyDescriptor> descriptors; + + private String failureAttribute; + private volatile Map<String, ReleaseAttributes> releaseValues = new ConcurrentHashMap<>(); + private long expirationDuration = 0L; + private String releaseSignalAttribute; + private boolean copyAttributes; + + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships .add(REL_HOLD); + relationships .add(REL_RELEASE); + relationships .add(REL_EXPIRED); + relationships .add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(RELEASE_SIGNAL_ATTRIBUTE); + descriptors.add(MAX_SIGNAL_AGE); + descriptors.add(COPY_SIGNAL_ATTRIBUTES); + descriptors.add(FAILURE_ATTRIBUTE); + this.descriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public final Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + this.expirationDuration = context.getProperty(MAX_SIGNAL_AGE) + .asTimePeriod(TimeUnit.MILLISECONDS); + this.releaseSignalAttribute = context.getProperty( + RELEASE_SIGNAL_ATTRIBUTE).getValue(); + this.copyAttributes = context.getProperty( + COPY_SIGNAL_ATTRIBUTES).asBoolean(); + PropertyValue failureAttrValue = context.getProperty(FAILURE_ATTRIBUTE); + if (failureAttrValue != null) { + this.failureAttribute = failureAttrValue.getValue(); + } + + this.excludedAttributes.clear(); + this.excludedAttributes.add(FLOW_FILE_RELEASE_VALUE); + if (this.failureAttribute != null) { + this.excludedAttributes.add(this.failureAttribute); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + int encountered = 0; + FlowFile flowFile = null; + while((flowFile = session.get()) != null) { + if (StringUtils.isBlank(flowFile.getAttribute(FLOW_FILE_RELEASE_VALUE))) { + this.processHoldFile(flowFile, session); + } else { + this.processSignal(flowFile, session); + } + encountered++; + } + + if (!isScheduled()) { + return; + } + if (encountered == 0) { + context.yield(); + } + } + + /** + * Stores a signal and all associated flow file attributes, as applicable. + * @param flowFile + * @param session + */ + private void processSignal(FlowFile flowFile, ProcessSession session) { + ReleaseAttributes releaseAttributes = new ReleaseAttributes(); + // Store any propagated signal attributes + if (this.copyAttributes) { + releaseAttributes.attributes.putAll(flowFile.getAttributes()); + } + + // Check if the signal indicates a failure upstream + String failureValue = flowFile.getAttribute(failureAttribute); + releaseAttributes.failed = "true".equalsIgnoreCase(failureValue); + String releaseValue = flowFile.getAttribute(FLOW_FILE_RELEASE_VALUE); + releaseValues.put(releaseValue , releaseAttributes); + if (getLogger().isDebugEnabled()) { + getLogger().debug("{} is marking flow files with {}={} for release", + new Object[] {flowFile, releaseSignalAttribute, releaseValue }); + } + + session.remove(flowFile); + } + + /** + * Transfers the held file to either Release (if a release signal has + * been found), Hold (if not), or Expired (if it has expired). + * @param flowFile + * @param session + */ + private void processHoldFile(FlowFile flowFile, ProcessSession session) { + String attributeValue = flowFile.getAttribute(releaseSignalAttribute); + + // Do we have a matching attribute to be released? + if (attributeValue != null && releaseValues.containsKey(attributeValue)) { + ReleaseAttributes releaseAttributes = releaseValues + .get(attributeValue); + boolean copyAttributes = (!releaseAttributes.attributes.isEmpty() && this.copyAttributes); + if (copyAttributes) { + Map<String, String> signalAttributes = getNewAttributes( + flowFile.getAttributes(), releaseAttributes.attributes, + this.excludedAttributes); + flowFile = session.putAllAttributes(flowFile, signalAttributes); + flowFile = session.removeAllAttributes(flowFile, + this.excludedAttributes); + } + if (getLogger().isDebugEnabled()) { + getLogger().debug("{} was released. Attributes copied? {}", + new Object[] { flowFile, copyAttributes }); + } + // Remove the signal + synchronized (releaseValues) { + releaseValues.remove(attributeValue); + if (releaseAttributes.failed) { + getLogger() + .warn("Received a non-success value for {}, routing to failure: {}", + new Object[] { failureAttribute, flowFile }); + session.transfer(flowFile, REL_FAILURE); + } else { + session.transfer(flowFile, REL_RELEASE); + } + } + } else { + // We don't have a signal yet. Let's check if it has expired + long entryDate = flowFile.getEntryDate(); + if (isExpired(entryDate)) { + session.transfer(flowFile, REL_EXPIRED); + // It expired. We likely have some expired signals too, so let's + // check + synchronized (releaseValues) { + for (Iterator<Entry<String, ReleaseAttributes>> it = releaseValues + .entrySet().iterator(); it.hasNext();) { + Entry<String, ReleaseAttributes> entry = it.next(); + ReleaseAttributes releaseAttributes = entry.getValue(); + if (isExpired(releaseAttributes.creationTimestamp)) { + releaseAttributes.attributes.clear(); + it.remove(); + } + } + } + if (getLogger().isDebugEnabled()) { + getLogger().debug("Expiring {}", new Object[] { flowFile }); + } + } else { + // It hasn't expired, so transfer to Hold + session.transfer(flowFile, REL_HOLD); + if (getLogger().isTraceEnabled()) { + getLogger().trace("Holding {}", new Object[] { flowFile }); + } + } + } + } + + // Compares two maps and returns only those attributes that are new + private static Map<String, String> getNewAttributes( + Map<String, String> existingAttributes, + Map<String, String> attributesToAdd, Set<String> alwaysOverrideKeys) { + Map<String, String> newAttributes = new HashMap<>(); + for (Entry<String, String> entryToAdd : attributesToAdd.entrySet()) { + if (!existingAttributes.containsKey(entryToAdd.getKey()) + || alwaysOverrideKeys.contains(entryToAdd.getKey())) { + newAttributes.put(entryToAdd.getKey(), entryToAdd.getValue()); + } + } + return newAttributes; + } + + // Returns true if the given time is far enough in the past to expire + private boolean isExpired(Long creationTimestamp) { + return creationTimestamp + expirationDuration < System.currentTimeMillis(); + } + + public static class ReleaseAttributes { + private long creationTimestamp = System.currentTimeMillis(); + private boolean failed; + private Map<String, String> attributes = new HashMap<>(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8177ae4f/nar-bundles/standard-bundle/standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nar-bundles/standard-bundle/standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 5f86b11..75d2b84 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nar-bundles/standard-bundle/standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -30,6 +30,7 @@ org.apache.nifi.processors.standard.GetHTTP org.apache.nifi.processors.standard.GetSFTP org.apache.nifi.processors.standard.HashAttribute org.apache.nifi.processors.standard.HashContent +org.apache.nifi.processors.standard.HoldFile org.apache.nifi.processors.standard.IdentifyMimeType org.apache.nifi.processors.standard.InvokeHTTP org.apache.nifi.processors.standard.ListenHTTP http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8177ae4f/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html b/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html new file mode 100644 index 0000000..221c8cb --- /dev/null +++ b/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html @@ -0,0 +1,158 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>HoldFile</title> + + + <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> + </head> + + <body> + + <!-- Processor Documentation ================================================== --> + <h2>Description:</h2> + <p>This processor holds incoming flow files until a matching signal + flow file enters the processor. Incoming files are classified as either + held files or signals. Held files are routed to the Held relationship + until a matching signal has been received. + </p> + + <p>This processor primarily supports the following use case: + <ul> + <li>A file in format A needs to be sent to Endpoint A</li> + <li>The same file in format B needs to be sent to Endpoint B, + but should not proceed until A has reached Endpoint A. + This restriction is most common when Endpoint B + requires some output of Endpoint A.</li> + </ul> + </p> + <p>Signal files are distinguished from held files by the presence of the + "flow.file.release.value" attribute on the signal files. When a signal + file enters, its value is cached in the processor. The processor is also + configured with a "Release Signal Attribute". Held files with this + attribute whose value matches a received signal value will be released.</p> + + <p>An example: HoldFile is configured with Release Signal Attribute = "myId". + Its 'hold' relationship routes back onto itself. + <ol> + <li>flowFile 1 { myId : "123" } enters HoldFile. It is routed to the 'hold' relationship.</li> + <li>flowFile 2 { flow.file.release.value : "123" } enters HoldFile. + Flowfile 1 is then routed to 'release'.</li> + </ol> + + <p>Signal flow files will also copy their attributes to matching held files, + unless otherwise indicated. This is what allows the output of + Endpoint A to pass to Endpoint B, above. + </p> + <p> + <strong>Uses Attributes:</strong> + </p> + <table border="1"> + <thead> + <tr> + <th>Attribute Name</th> + <th>Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>flow.file.release.value</td> + <td>Files with this attribute are considered signals. The attribute + value is stored in a cache in the processor, and any held flow files + are released if their Release Signal Attribute (configured in the processor) + matches this value.</td> + </tr> + </tbody> + </table> + + <p> + <strong>Properties:</strong> + </p> + <p>In the list below, the names of required properties appear + in bold. Any other properties (not in bold) are considered optional. + If a property has a default value, it is indicated. If a property + supports the use of the NiFi Expression Language (or simply, + "expression language"), that is also indicated.</p> + <ul> + <li><strong>Release Signal Attribute</strong> + <ul> + <li>The flow file attribute name on held files that will be checked against the signal cache.</li> + <li>Default value: none</li> + <li>Supports expression language: false</li> + </ul> + </li> + <li><strong>Max Signal Age</strong> + <ul> + <li>The maximum age of a signal that will trigger a file to be released. + Signals are expired from the cache after this duration, and + any matching held files will be routed to the 'expired' relationship. + Expected format is <duration> <time unit> where <duration> is a positive + integer and <time unit> is one of seconds, minutes, hours</li> + <li>Default value: 24 hours</li> + <li>Supports expression language: false</li> + </ul></li> + <li><strong>Copy Signal Attributes?</strong> + <ul> + <li>If true, a signal's flow file attributes will be copied to its matching held files, + with the exception of flow.file.release.value and the configured Signal Failure Attribute.</li> + <li>Default value: true</li> + <li>Supports expression language: false</li> + </ul> + </li> + <li>Signal Failure Attribute + <ul> + <li>Signals that have this attribute set to 'true' + will cause matching held flow files to route to Failure. If this attribute + is not populated, it is assumed that the flow file succeeds. This allows + upstream failures to propagate to held files.</li> + <li>Default value: no default</li> + <li>Supports expression language: false</li> + </ul> + </li> + </ul> + + <p> + <strong>Relationships:</strong> + </p> + <ul> + <li>hold + <ul> + <li>Held files whose signals have not been received are routed here. The most common pattern is to + route this relationship back into the processor, and set it on a timer.</li> + </ul> + </li> + <li>release + <ul> + <li>Held files whose signals have been received are routed here.</li> + </ul> + </li> + <li>failure + <ul> + <li>Held files whose signal contains the Signal Failure Attribute are + routed here, indicating a processing failure upstream.</li> + </ul> + </li> + <li>expired + <ul> + <li>Held files that expire are routed here.</li> + </ul> + </li> + </ul> + + </body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8177ae4f/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java new file mode 100644 index 0000000..f7b2678 --- /dev/null +++ b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java @@ -0,0 +1,155 @@ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestHoldFile { + private TestRunner testRunner; + + @Before + public void init() { + testRunner = TestRunners.newTestRunner(HoldFile.class); + } + + @Test + public void testHoldAndRelease() throws IOException { + testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours"); + testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, "identifier"); + testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, "service.failed"); + testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, "true"); + + // One signal file + Map<String, String> signal = new HashMap<String, String>(); + signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567"); + signal.put("return.value", "a response"); + testRunner.enqueue("signal".getBytes(), signal); + + // Two normal flow files to be held + Map<String, String> metadata = new HashMap<String, String>(); + metadata.put("identifier", "123456"); + + Map<String, String> metadata2 = new HashMap<String, String>(); + metadata2.put("identifier", "1234567"); + + testRunner.enqueue("file1".getBytes(), metadata); + testRunner.enqueue("file2".getBytes(), metadata2); + + testRunner.run(); + + // One was held because it didn't match + testRunner.assertTransferCount(HoldFile.REL_HOLD, 1); + // The matching one was released + testRunner.assertTransferCount(HoldFile. REL_RELEASE, 1); + + // Make sure the propagated attribute from signal was added to the held flow file + MockFlowFile released = testRunner + .getFlowFilesForRelationship(HoldFile.REL_RELEASE).get(0); + assertEquals("Signal attributes were not copied to held file", + "a response", released.getAttribute("return.value")); + assertNull("flow.file.release.value should not be propagated", released + .getAttribute(HoldFile.FLOW_FILE_RELEASE_VALUE)); + + // None expired + testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0); + + // None failed + testRunner.assertTransferCount(HoldFile.REL_FAILURE, 0); + } + + @Test + public void testHoldAndRelease_noCopy() throws IOException { + testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours"); + testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, "identifier"); + testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, "service.failed"); + testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, "false"); + + // One signal file + Map<String, String> signal = new HashMap<String, String>(); + signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567"); + signal.put("return.value", "a response"); + testRunner.enqueue("signal".getBytes(), signal); + + // Two normal flow files to be held + Map<String, String> metadata = new HashMap<String, String>(); + metadata.put("identifier", "123456"); + + Map<String, String> metadata2 = new HashMap<String, String>(); + metadata2.put("identifier", "1234567"); + + testRunner.enqueue("file1".getBytes(), metadata); + testRunner.enqueue("file2".getBytes(), metadata2); + + testRunner.run(); + + // One was held because it didn't match + testRunner.assertTransferCount(HoldFile.REL_HOLD, 1); + // The matching one was released + testRunner.assertTransferCount(HoldFile. REL_RELEASE, 1); + + // Make sure the propagated attribute from signal was added to the held flow file + MockFlowFile released = testRunner + .getFlowFilesForRelationship(HoldFile.REL_RELEASE).get(0); + assertNull("Attributes were not supposed to be copied", released + .getAttribute("return.value")); + assertNull("flow.file.release.value should not be propagated", released + .getAttribute(HoldFile.FLOW_FILE_RELEASE_VALUE)); + + // None expired + testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0); + + // None failed + testRunner.assertTransferCount(HoldFile.REL_FAILURE, 0); + } + + @Test + public void testHoldAndRelease_failure() throws IOException { + testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours"); + testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, "identifier"); + testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, "service.failed"); + testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, "false"); + + // One signal file + Map<String, String> signal = new HashMap<String, String>(); + signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567"); + signal.put("return.value", "a response"); + signal.put("service.failed", "true"); + testRunner.enqueue("signal".getBytes(), signal); + + // Two normal flow files to be held + Map<String, String> metadata = new HashMap<String, String>(); + metadata.put("identifier", "123456"); + + Map<String, String> metadata2 = new HashMap<String, String>(); + metadata2.put("identifier", "1234567"); + + testRunner.enqueue("file1".getBytes(), metadata); + testRunner.enqueue("file2".getBytes(), metadata2); + + testRunner.run(); + + // One was held because it didn't match + testRunner.assertTransferCount(HoldFile.REL_HOLD, 1); + // The matching one was routed to failure because of the + // service.failed attribute + testRunner.assertTransferCount(HoldFile. REL_FAILURE, 1); + + // Make sure the propagated attribute from signal was added to the held flow file + MockFlowFile released = testRunner + .getFlowFilesForRelationship(HoldFile.REL_FAILURE).get(0); + assertNull("Attributes were not supposed to be copied", released + .getAttribute("return.value")); + + // None expired + testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0); + } +}