Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-190 [created] 6061475ed


NIFI-190: Initial commit of HoldFile processor


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8177ae4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8177ae4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8177ae4f

Branch: refs/heads/NIFI-190
Commit: 8177ae4f1f8a6a3de23685f8068def23fe2b6b01
Parents: ac3c3bb
Author: gresockj <jgres...@gmail.com>
Authored: Fri Dec 19 22:46:42 2014 -0500
Committer: gresockj <jgres...@gmail.com>
Committed: Sat Dec 27 05:48:52 2014 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/HoldFile.java      | 283 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../index.html                                  | 158 +++++++++++
 .../nifi/processors/standard/TestHoldFile.java  | 155 ++++++++++
 4 files changed, 597 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8177ae4f/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java
 
b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java
new file mode 100644
index 0000000..aa06560
--- /dev/null
+++ 
b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java
@@ -0,0 +1,283 @@
+package org.apache.nifi.processors.standard;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.annotation.CapabilityDescription;
+import org.apache.nifi.processor.annotation.OnScheduled;
+import org.apache.nifi.processor.annotation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@Tags({"hold", "release", "signal"})
+@CapabilityDescription("Holds incoming flow files until a matching signal flow 
file enters the processor.  "
+               + "Incoming files are classified as either held files or 
signals.  "
+               + "Held files are routed to the Held relationship until a 
matching signal has been received.")
+public class HoldFile extends AbstractProcessor {
+       public static final String FLOW_FILE_RELEASE_VALUE = 
"flow.file.release.value";
+       public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+       
+       public static final PropertyDescriptor MAX_SIGNAL_AGE = new 
PropertyDescriptor
+                       .Builder().name("Max Signal Age")
+                       .description("The maximum age of a signal that will 
trigger a file to be released.  "
+                                       + "Expected format is <duration> <time 
unit> where <duration> is a positive "
+                                       + "integer and <time unit> is one of 
seconds, minutes, hours")
+                       .required(true).defaultValue("24 hours")
+                       
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, 
Integer.MAX_VALUE, TimeUnit.SECONDS))
+                       .build();
+       public static final PropertyDescriptor RELEASE_SIGNAL_ATTRIBUTE = new 
PropertyDescriptor
+                       .Builder().name("Release Signal Attribute")
+                       .description("The flow file attribute name on held 
files that will be checked against values in the signal cache.")
+                       .required(true).
+                       addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+                       .build();
+       public static final PropertyDescriptor COPY_SIGNAL_ATTRIBUTES = new 
PropertyDescriptor
+                       .Builder().name("Copy Signal Attributes?")
+                       .description("If true, a signal's flow file attributes 
will be copied to its matching held files, "
+                                       + "with the exception of 
flow.file.release.value and the configured Signal Failure Attribute")
+                       .required(true)
+                       .defaultValue("true")
+                       .allowableValues("true", "false")
+                       .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                       .build();
+       public static final PropertyDescriptor FAILURE_ATTRIBUTE = new 
PropertyDescriptor
+                       .Builder().name("Signal Failure Attribute")
+                       .description("Signals that have this attribute set to 
'true' "
+                                       + "will cause matching held flow files 
to route to Failure.  If this attribute "
+                                       + "is not populated, it is assumed that 
the flow file succeeds.")
+                       .required(false)
+                       
.addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+                       .build();
+       
+       public static final Relationship REL_HOLD = new Relationship.Builder()
+                       .name("hold").description("Held files whose signals 
have not been received are routed here").build();
+
+       public static final Relationship REL_RELEASE = new 
Relationship.Builder()
+                       .name("release").description("Held files whose signals 
have been received are routed here").build();
+
+       public static final Relationship REL_EXPIRED = new 
Relationship.Builder()
+                       .name("expired").description("Held files that expire 
are routed here").build();
+
+       public static final Relationship REL_FAILURE = new 
Relationship.Builder()
+                       .name("failure").description("Held files whose signal 
contains the Signal Failure Attribute are "
+                                       + "routed here, indicating a processing 
failure upstream").build();
+       
+       private Set<String> excludedAttributes = new HashSet<>();
+
+       private Set<Relationship> relationships;
+       private List<PropertyDescriptor> descriptors;
+
+       private String failureAttribute;
+       private volatile Map<String, ReleaseAttributes> releaseValues = new 
ConcurrentHashMap<>();
+       private long expirationDuration = 0L;
+       private String releaseSignalAttribute;
+       private boolean copyAttributes;
+       
+
+       @Override
+       protected void init(final ProcessorInitializationContext context) {
+               final Set<Relationship> relationships = new HashSet<>();
+               relationships .add(REL_HOLD);
+               relationships .add(REL_RELEASE);
+               relationships .add(REL_EXPIRED);
+               relationships .add(REL_FAILURE);
+               this.relationships = Collections.unmodifiableSet(relationships);
+
+               final List<PropertyDescriptor> descriptors = new ArrayList<>();
+               descriptors.add(RELEASE_SIGNAL_ATTRIBUTE);
+               descriptors.add(MAX_SIGNAL_AGE);
+               descriptors.add(COPY_SIGNAL_ATTRIBUTES);
+               descriptors.add(FAILURE_ATTRIBUTE);
+               this.descriptors = Collections.unmodifiableList(descriptors);
+       }
+
+       @Override
+       public final Set<Relationship> getRelationships() { 
+               return relationships;
+       }
+
+       @Override
+       public final List<PropertyDescriptor> getSupportedPropertyDescriptors() 
{
+               return descriptors;
+       }
+
+       @OnScheduled
+       public void onScheduled(final ProcessContext context) {
+               this.expirationDuration = context.getProperty(MAX_SIGNAL_AGE)
+                               .asTimePeriod(TimeUnit.MILLISECONDS);
+               this.releaseSignalAttribute = context.getProperty(
+                               RELEASE_SIGNAL_ATTRIBUTE).getValue();
+               this.copyAttributes = context.getProperty(
+                               COPY_SIGNAL_ATTRIBUTES).asBoolean();
+               PropertyValue failureAttrValue = 
context.getProperty(FAILURE_ATTRIBUTE);
+               if (failureAttrValue != null) {
+                       this.failureAttribute = failureAttrValue.getValue();
+               }
+
+               this.excludedAttributes.clear();
+               this.excludedAttributes.add(FLOW_FILE_RELEASE_VALUE);
+               if (this.failureAttribute != null) {
+                       this.excludedAttributes.add(this.failureAttribute);
+               }
+       }
+
+       @Override
+       public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+               int encountered = 0;
+               FlowFile flowFile = null;
+               while((flowFile = session.get()) != null) {
+                       if 
(StringUtils.isBlank(flowFile.getAttribute(FLOW_FILE_RELEASE_VALUE))) {
+                               this.processHoldFile(flowFile, session);
+                       } else {
+                               this.processSignal(flowFile, session);
+                       }
+                       encountered++;
+               }       
+       
+               if (!isScheduled()) { 
+                       return; 
+               }
+               if (encountered == 0) { 
+                       context.yield();
+               }
+       }
+
+       /**
+        * Stores a signal and all associated flow file attributes, as 
applicable.
+        * @param flowFile
+        * @param session
+        */
+       private void processSignal(FlowFile flowFile, ProcessSession session) {
+               ReleaseAttributes releaseAttributes = new ReleaseAttributes();
+               // Store any propagated signal attributes
+               if (this.copyAttributes) {
+                       
releaseAttributes.attributes.putAll(flowFile.getAttributes());
+               }
+
+               // Check if the signal indicates a failure upstream
+               String failureValue = flowFile.getAttribute(failureAttribute);
+               releaseAttributes.failed = 
"true".equalsIgnoreCase(failureValue);
+               String releaseValue = 
flowFile.getAttribute(FLOW_FILE_RELEASE_VALUE);
+               releaseValues.put(releaseValue , releaseAttributes);
+               if (getLogger().isDebugEnabled()) {
+                       getLogger().debug("{} is marking flow files with {}={} 
for release", 
+                                       new Object[] {flowFile, 
releaseSignalAttribute, releaseValue });
+               }
+               
+               session.remove(flowFile);
+       }
+
+       /**
+        * Transfers the held file to either Release (if a release signal has
+        * been found), Hold (if not), or Expired (if it has expired).
+        * @param flowFile
+        * @param session
+        */
+       private void processHoldFile(FlowFile flowFile, ProcessSession session) 
{
+               String attributeValue = 
flowFile.getAttribute(releaseSignalAttribute);
+
+               // Do we have a matching attribute to be released?
+               if (attributeValue != null && 
releaseValues.containsKey(attributeValue)) {
+                       ReleaseAttributes releaseAttributes = releaseValues
+                                       .get(attributeValue);
+                       boolean copyAttributes = 
(!releaseAttributes.attributes.isEmpty() && this.copyAttributes);
+                       if (copyAttributes) {
+                               Map<String, String> signalAttributes = 
getNewAttributes(
+                                               flowFile.getAttributes(), 
releaseAttributes.attributes,
+                                               this.excludedAttributes);
+                               flowFile = session.putAllAttributes(flowFile, 
signalAttributes);
+                               flowFile = session.removeAllAttributes(flowFile,
+                                               this.excludedAttributes);
+                       }
+                       if (getLogger().isDebugEnabled()) {
+                               getLogger().debug("{} was released.  Attributes 
copied? {}",
+                                               new Object[] { flowFile, 
copyAttributes });
+                       }
+                       // Remove the signal
+                       synchronized (releaseValues) {
+                               releaseValues.remove(attributeValue);
+                               if (releaseAttributes.failed) {
+                                       getLogger()
+                                                       .warn("Received a 
non-success value for {}, routing to failure: {}",
+                                                                       new 
Object[] { failureAttribute, flowFile });
+                                       session.transfer(flowFile, REL_FAILURE);
+                               } else {
+                                       session.transfer(flowFile, REL_RELEASE);
+                               }
+                       }
+               } else {
+                       // We don't have a signal yet. Let's check if it has 
expired
+                       long entryDate = flowFile.getEntryDate();
+                       if (isExpired(entryDate)) {
+                               session.transfer(flowFile, REL_EXPIRED);
+                               // It expired. We likely have some expired 
signals too, so let's
+                               // check
+                               synchronized (releaseValues) {
+                                       for (Iterator<Entry<String, 
ReleaseAttributes>> it = releaseValues
+                                                       .entrySet().iterator(); 
it.hasNext();) {
+                                               Entry<String, 
ReleaseAttributes> entry = it.next();
+                                               ReleaseAttributes 
releaseAttributes = entry.getValue();
+                                               if 
(isExpired(releaseAttributes.creationTimestamp)) {
+                                                       
releaseAttributes.attributes.clear();
+                                                       it.remove();
+                                               }
+                                       }
+                               }
+                               if (getLogger().isDebugEnabled()) {
+                                       getLogger().debug("Expiring {}", new 
Object[] { flowFile });
+                               }
+                       } else {
+                               // It hasn't expired, so transfer to Hold
+                               session.transfer(flowFile, REL_HOLD);
+                               if (getLogger().isTraceEnabled()) {
+                                       getLogger().trace("Holding {}", new 
Object[] { flowFile });
+                               }
+                       }
+               }
+       }
+
+       // Compares two maps and returns only those attributes that are new
+       private static Map<String, String> getNewAttributes(
+                       Map<String, String> existingAttributes,
+                       Map<String, String> attributesToAdd, Set<String> 
alwaysOverrideKeys) {
+               Map<String, String> newAttributes = new HashMap<>();
+               for (Entry<String, String> entryToAdd : 
attributesToAdd.entrySet()) {
+                       if (!existingAttributes.containsKey(entryToAdd.getKey())
+                                       || 
alwaysOverrideKeys.contains(entryToAdd.getKey())) {
+                               newAttributes.put(entryToAdd.getKey(), 
entryToAdd.getValue());
+                       }
+               }
+               return newAttributes;
+       }
+
+       // Returns true if the given time is far enough in the past to expire
+       private boolean isExpired(Long creationTimestamp) {
+               return creationTimestamp + expirationDuration < 
System.currentTimeMillis();
+       }
+
+       public static class ReleaseAttributes {
+               private long creationTimestamp = System.currentTimeMillis();
+               private boolean failed;
+               private Map<String, String> attributes = new HashMap<>();
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8177ae4f/nar-bundles/standard-bundle/standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nar-bundles/standard-bundle/standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nar-bundles/standard-bundle/standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 5f86b11..75d2b84 100644
--- 
a/nar-bundles/standard-bundle/standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nar-bundles/standard-bundle/standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -30,6 +30,7 @@ org.apache.nifi.processors.standard.GetHTTP
 org.apache.nifi.processors.standard.GetSFTP
 org.apache.nifi.processors.standard.HashAttribute
 org.apache.nifi.processors.standard.HashContent
+org.apache.nifi.processors.standard.HoldFile
 org.apache.nifi.processors.standard.IdentifyMimeType
 org.apache.nifi.processors.standard.InvokeHTTP
 org.apache.nifi.processors.standard.ListenHTTP

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8177ae4f/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html
----------------------------------------------------------------------
diff --git 
a/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html
 
b/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html
new file mode 100644
index 0000000..221c8cb
--- /dev/null
+++ 
b/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html
@@ -0,0 +1,158 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>HoldFile</title>
+
+
+        <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
+    </head>
+
+    <body>
+
+        <!-- Processor Documentation 
================================================== -->
+        <h2>Description:</h2>
+        <p>This processor holds incoming flow files until a matching signal 
+        flow file enters the processor.  Incoming files are classified as 
either 
+        held files or signals.  Held files are routed to the Held relationship 
+        until a matching signal has been received.
+        </p>
+
+               <p>This processor primarily supports the following use case:
+               <ul>
+                       <li>A file in format A needs to be sent to Endpoint 
A</li>
+                       <li>The same file in format B needs to be sent to 
Endpoint B, 
+                               but should not proceed until A has reached 
Endpoint A.  
+                               This restriction is most common when Endpoint B 
+                               requires some output of Endpoint A.</li>
+               </ul>
+               </p>
+               <p>Signal files are distinguished from held files by the 
presence of the 
+               "flow.file.release.value" attribute on the signal files.  When 
a signal
+               file enters, its value is cached in the processor.  The 
processor is also
+               configured with a "Release Signal Attribute".  Held files with 
this 
+               attribute whose value matches a received signal value will be 
released.</p>
+
+               <p>An example:  HoldFile is configured with Release Signal 
Attribute = "myId".  
+               Its 'hold' relationship routes back onto itself.
+               <ol>
+                       <li>flowFile 1 { myId : "123" } enters HoldFile.  It is 
routed to the 'hold' relationship.</li>
+                       <li>flowFile 2 { flow.file.release.value : "123" } 
enters HoldFile.  
+                       Flowfile 1 is then routed to 'release'.</li>
+               </ol>
+
+               <p>Signal flow files will also copy their attributes to 
matching held files, 
+               unless otherwise indicated.  This is what allows the output of 
+               Endpoint A to pass to Endpoint B, above.
+               </p>
+        <p>
+            <strong>Uses Attributes:</strong>
+        </p>
+        <table border="1">
+            <thead>
+                <tr>
+                    <th>Attribute Name</th>
+                    <th>Description</th>
+                </tr>
+            </thead>
+            <tbody>
+                <tr>
+                    <td>flow.file.release.value</td>
+                    <td>Files with this attribute are considered signals.  The 
attribute
+                    value is stored in a cache in the processor, and any held 
flow files
+                    are released if their Release Signal Attribute (configured 
in the processor)
+                    matches this value.</td>
+                </tr>
+            </tbody>
+        </table>
+
+        <p>
+            <strong>Properties:</strong>
+        </p>
+        <p>In the list below, the names of required properties appear
+            in bold. Any other properties (not in bold) are considered 
optional.
+            If a property has a default value, it is indicated. If a property
+            supports the use of the NiFi Expression Language (or simply,
+            "expression language"), that is also indicated.</p>
+        <ul>
+            <li><strong>Release Signal Attribute</strong>
+                <ul>
+                    <li>The flow file attribute name on held files that will 
be checked against the signal cache.</li>
+                    <li>Default value: none</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Max Signal Age</strong>
+                <ul>
+                    <li>The maximum age of a signal that will trigger a file 
to be released.
+                    Signals are expired from the cache after this duration, and
+                    any matching held files will be routed to the 'expired' 
relationship.
+                                       Expected format is <duration> <time 
unit> where <duration> is a positive 
+                                       integer and <time unit> is one of 
seconds, minutes, hours</li>
+                    <li>Default value: 24 hours</li>
+                    <li>Supports expression language: false</li>
+                </ul></li>
+            <li><strong>Copy Signal Attributes?</strong>
+                <ul>
+                    <li>If true, a signal's flow file attributes will be 
copied to its matching held files, 
+                                       with the exception of 
flow.file.release.value and the configured Signal Failure Attribute.</li>
+                    <li>Default value: true</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li>Signal Failure Attribute
+                <ul>
+                    <li>Signals that have this attribute set to 'true' 
+                                       will cause matching held flow files to 
route to Failure.  If this attribute 
+                                       is not populated, it is assumed that 
the flow file succeeds.  This allows
+                                       upstream failures to propagate to held 
files.</li>
+                    <li>Default value: no default</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+        </ul>
+
+        <p>
+            <strong>Relationships:</strong>
+        </p>
+        <ul>
+            <li>hold
+                <ul>
+                    <li>Held files whose signals have not been received are 
routed here.  The most common pattern is to
+                    route this relationship back into the processor, and set 
it on a timer.</li>
+                </ul>
+            </li>
+            <li>release
+                <ul>
+                    <li>Held files whose signals have been received are routed 
here.</li>
+                </ul>
+            </li>
+            <li>failure
+                <ul>
+                    <li>Held files whose signal contains the Signal Failure 
Attribute are 
+                    routed here, indicating a processing failure upstream.</li>
+                </ul>
+            </li>
+            <li>expired
+                <ul>
+                    <li>Held files that expire are routed here.</li>
+                </ul>
+            </li>
+        </ul>
+
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8177ae4f/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java
 
b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java
new file mode 100644
index 0000000..f7b2678
--- /dev/null
+++ 
b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java
@@ -0,0 +1,155 @@
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHoldFile {
+       private TestRunner testRunner;
+
+       @Before
+       public void init() {
+               testRunner = TestRunners.newTestRunner(HoldFile.class);
+       }
+
+       @Test
+       public void testHoldAndRelease() throws IOException {
+               testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours");
+               testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, 
"identifier");
+               testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, 
"service.failed");
+               testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, "true");
+               
+               // One signal file
+               Map<String, String> signal = new HashMap<String, String>();
+               signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567");
+               signal.put("return.value", "a response");
+               testRunner.enqueue("signal".getBytes(), signal);
+
+               // Two normal flow files to be held
+               Map<String, String> metadata = new HashMap<String, String>();
+               metadata.put("identifier", "123456");
+
+               Map<String, String> metadata2 = new HashMap<String, String>();
+               metadata2.put("identifier", "1234567");
+               
+               testRunner.enqueue("file1".getBytes(), metadata);
+               testRunner.enqueue("file2".getBytes(), metadata2);
+
+               testRunner.run();
+               
+               // One was held because it didn't match
+               testRunner.assertTransferCount(HoldFile.REL_HOLD, 1);
+               // The matching one was released
+               testRunner.assertTransferCount(HoldFile. REL_RELEASE, 1);
+
+               // Make sure the propagated attribute from signal was added to 
the held flow file
+               MockFlowFile released = testRunner
+                               
.getFlowFilesForRelationship(HoldFile.REL_RELEASE).get(0);
+               assertEquals("Signal attributes were not copied to held file", 
+                               "a response", 
released.getAttribute("return.value"));
+               assertNull("flow.file.release.value should not be propagated", 
released
+                               
.getAttribute(HoldFile.FLOW_FILE_RELEASE_VALUE));
+
+               // None expired
+               testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0);
+
+               // None failed
+               testRunner.assertTransferCount(HoldFile.REL_FAILURE, 0);
+       }
+
+       @Test
+       public void testHoldAndRelease_noCopy() throws IOException {
+               testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours");
+               testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, 
"identifier");
+               testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, 
"service.failed");
+               testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, 
"false");
+               
+               // One signal file
+               Map<String, String> signal = new HashMap<String, String>();
+               signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567");
+               signal.put("return.value", "a response");
+               testRunner.enqueue("signal".getBytes(), signal);
+
+               // Two normal flow files to be held
+               Map<String, String> metadata = new HashMap<String, String>();
+               metadata.put("identifier", "123456");
+
+               Map<String, String> metadata2 = new HashMap<String, String>();
+               metadata2.put("identifier", "1234567");
+               
+               testRunner.enqueue("file1".getBytes(), metadata);
+               testRunner.enqueue("file2".getBytes(), metadata2);
+
+               testRunner.run();
+               
+               // One was held because it didn't match
+               testRunner.assertTransferCount(HoldFile.REL_HOLD, 1);
+               // The matching one was released
+               testRunner.assertTransferCount(HoldFile. REL_RELEASE, 1);
+
+               // Make sure the propagated attribute from signal was added to 
the held flow file
+               MockFlowFile released = testRunner
+                               
.getFlowFilesForRelationship(HoldFile.REL_RELEASE).get(0);
+               assertNull("Attributes were not supposed to be copied", released
+                               .getAttribute("return.value"));
+               assertNull("flow.file.release.value should not be propagated", 
released
+                               
.getAttribute(HoldFile.FLOW_FILE_RELEASE_VALUE));
+
+               // None expired
+               testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0);
+
+               // None failed
+               testRunner.assertTransferCount(HoldFile.REL_FAILURE, 0);
+       }
+       
+       @Test
+       public void testHoldAndRelease_failure() throws IOException {
+               testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours");
+               testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, 
"identifier");
+               testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, 
"service.failed");
+               testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, 
"false");
+               
+               // One signal file
+               Map<String, String> signal = new HashMap<String, String>();
+               signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567");
+               signal.put("return.value", "a response");
+               signal.put("service.failed", "true");
+               testRunner.enqueue("signal".getBytes(), signal);
+
+               // Two normal flow files to be held
+               Map<String, String> metadata = new HashMap<String, String>();
+               metadata.put("identifier", "123456");
+
+               Map<String, String> metadata2 = new HashMap<String, String>();
+               metadata2.put("identifier", "1234567");
+               
+               testRunner.enqueue("file1".getBytes(), metadata);
+               testRunner.enqueue("file2".getBytes(), metadata2);
+
+               testRunner.run();
+               
+               // One was held because it didn't match
+               testRunner.assertTransferCount(HoldFile.REL_HOLD, 1);
+               // The matching one was routed to failure because of the
+               // service.failed attribute
+               testRunner.assertTransferCount(HoldFile. REL_FAILURE, 1);
+
+               // Make sure the propagated attribute from signal was added to 
the held flow file
+               MockFlowFile released = testRunner
+                               
.getFlowFilesForRelationship(HoldFile.REL_FAILURE).get(0);
+               assertNull("Attributes were not supposed to be copied", released
+                               .getAttribute("return.value"));
+
+               // None expired
+               testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0);
+       }
+}

Reply via email to