http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
index cbcc54d..385ac73 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
@@ -23,7 +23,8 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -57,6 +58,7 @@ import 
org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
  *
  */
 @SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"hadoop", "sequence file", "create", "sequencefile"})
 @CapabilityDescription("Creates Hadoop Sequence Files from incoming flow 
files")
 @SeeAlso(PutHDFS.class)

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 4a52fb7..aa03e73 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -44,6 +46,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"})
 @CapabilityDescription("Retrieves a file from HDFS. The content of the 
incoming FlowFile is replaced by the content of the file in HDFS. "
         + "The file in HDFS is left intact without any changes being made to 
it.")

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index de776d4..4c9deea 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -62,6 +64,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 
 @TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_ALLOWED)
 @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
 @CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) 
into FlowFiles. This Processor will delete the file from HDFS after fetching 
it.")
 @WritesAttributes({

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 151cbf2..563bda8 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -66,6 +68,7 @@ import org.codehaus.jackson.map.ObjectMapper;
 
 @TriggerSerially
 @TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
 @CapabilityDescription("Retrieves a listing of files from HDFS. For each file 
that is listed in HDFS, creates a FlowFile that represents "
         + "the HDFS file so that it can be fetched in conjunction with 
ListHDFS. This Processor is designed to run on Primary Node only "

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 901159b..bedf1b9 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -59,6 +61,7 @@ import org.apache.nifi.util.StopWatch;
 /**
  * This processor copies FlowFiles to HDFS.
  */
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"hadoop", "HDFS", "put", "copy", "filesystem"})
 @CapabilityDescription("Write FlowFile data to Hadoop Distributed File System 
(HDFS)")
 @WritesAttribute(attribute = "filename", description = "The name of the file 
written to HDFS comes from the value of this attribute.")

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
 
b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
index 574fb2d..3a6ac79 100644
--- 
a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
+++ 
b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -57,6 +59,7 @@ import ca.uhn.hl7v2.validation.impl.ValidationContextFactory;
 
 @SideEffectFree
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"HL7", "health level 7", "healthcare", "extract", "attributes"})
 @CapabilityDescription("Extracts information from an HL7 (Health Level 7) 
formatted FlowFile and adds the information as FlowFile Attributes. "
         + "The attributes are named as <Segment Name> <dot> <Field Index>. If 
the segment is repeating, the naming will be "

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
 
b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
index 53e7e69..26e8bb6 100644
--- 
a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
+++ 
b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
@@ -29,6 +29,8 @@ import java.util.Set;
 import org.apache.nifi.annotation.behavior.DynamicProperties;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -63,6 +65,7 @@ import ca.uhn.hl7v2.validation.impl.ValidationContextFactory;
 @EventDriven
 @SideEffectFree
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"HL7", "healthcare", "route", "Health Level 7"})
 @DynamicProperties({
     @DynamicProperty(name = "Name of a Relationship", value = "An HL7 Query 
Language query",

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java
 
b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java
index 7fe6195..b44eccd 100644
--- 
a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java
+++ 
b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java
@@ -16,11 +16,18 @@
  */
 package org.apache.nifi.processors.image;
 
-import com.drew.imaging.ImageMetadataReader;
-import com.drew.imaging.ImageProcessingException;
-import com.drew.metadata.Directory;
-import com.drew.metadata.Metadata;
-import com.drew.metadata.Tag;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -30,25 +37,22 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.ObjectHolder;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.HashMap;
+import com.drew.imaging.ImageMetadataReader;
+import com.drew.imaging.ImageProcessingException;
+import com.drew.metadata.Directory;
+import com.drew.metadata.Metadata;
+import com.drew.metadata.Tag;
 
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Exif", "Exchangeable", "image", "file", "format", "JPG", "GIF", "PNG", 
"BMP", "metadata","IPTC", "XMP"})
 @CapabilityDescription("Extract the image metadata from flowfiles containing 
images. This processor relies on this "
         + "metadata extractor library 
https://github.com/drewnoakes/metadata-extractor. It extracts a long list of "

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java
 
b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java
index c085b5f..176561f 100644
--- 
a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java
+++ 
b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java
@@ -17,7 +17,27 @@
 
 package org.apache.nifi.processors.image;
 
+import java.awt.Graphics2D;
+import java.awt.Image;
+import java.awt.image.BufferedImage;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.imageio.ImageIO;
+import javax.imageio.ImageReader;
+import javax.imageio.stream.ImageInputStream;
+
 import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -33,25 +53,9 @@ import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 
-import javax.imageio.ImageIO;
-import javax.imageio.ImageReader;
-import javax.imageio.stream.ImageInputStream;
-import java.awt.Image;
-import java.awt.Graphics2D;
-import java.awt.image.BufferedImage;
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Iterator;
-import java.util.concurrent.TimeUnit;
-
 @EventDriven
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({ "resize", "image", "jpg", "jpeg", "png", "bmp", "wbmp", "gif" })
 @CapabilityDescription("Resizes an image to user-specified dimensions. This 
Processor uses the image codecs registered with the "
     + "environment that NiFi is running in. By default, this includes JPEG, 
PNG, BMP, WBMP, and GIF images.")

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 26590df..e10977b 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -32,18 +32,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -58,7 +53,15 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Fetches messages from Apache Kafka")
 @Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
 @WritesAttributes({

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index d83c7bf..cff285c 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -30,10 +30,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -58,9 +56,13 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
 import org.apache.nifi.util.LongHolder;
 
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
 import scala.actors.threadpool.Arrays;
 
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
 @CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Kafka")
 public class PutKafka extends AbstractProcessor {

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
index 6c20a8f..6f126aa 100644
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
@@ -18,18 +18,20 @@
  */
 package org.apache.nifi.processors.kite;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
+import static 
org.apache.nifi.processor.util.StandardValidators.createLongValidator;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 import java.util.Set;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData.Record;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -53,11 +55,13 @@ import org.kitesdk.data.spi.DefaultConfiguration;
 import org.kitesdk.data.spi.filesystem.CSVFileReader;
 import org.kitesdk.data.spi.filesystem.CSVProperties;
 
-import static 
org.apache.nifi.processor.util.StandardValidators.createLongValidator;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 @Tags({"kite", "csv", "avro"})
-@CapabilityDescription(
-        "Converts CSV files to Avro according to an Avro Schema")
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts CSV files to Avro according to an Avro 
Schema")
 public class ConvertCSVToAvro extends AbstractKiteProcessor {
 
     private static final CSVProperties DEFAULTS = new 
CSVProperties.Builder().build();

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
index ec1503c..af120bf 100644
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
@@ -18,18 +18,18 @@
  */
 package org.apache.nifi.processors.kite;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 import java.util.Set;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData.Record;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -47,9 +47,13 @@ import org.kitesdk.data.SchemaNotFoundException;
 import org.kitesdk.data.spi.DefaultConfiguration;
 import org.kitesdk.data.spi.filesystem.JSONFileReader;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
 @Tags({"kite", "json", "avro"})
-@CapabilityDescription(
-        "Converts JSON files to Avro according to an Avro Schema")
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts JSON files to Avro according to an Avro 
Schema")
 public class ConvertJSONToAvro extends AbstractKiteProcessor {
 
     private static final Relationship SUCCESS = new Relationship.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
index 7a30db1..1986f0b 100644
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
@@ -18,16 +18,17 @@
  */
 package org.apache.nifi.processors.kite;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData.Record;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -46,6 +47,10 @@ import org.kitesdk.data.ValidationException;
 import org.kitesdk.data.View;
 import org.kitesdk.data.spi.SchemaValidationUtil;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"kite", "avro", "parquet", "hadoop", "hive", "hdfs", "hbase"})
 @CapabilityDescription("Stores Avro records in a Kite dataset")
 public class StoreInKiteDataset extends AbstractKiteProcessor {

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java
 
b/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java
index 8398152..5f58781 100644
--- 
a/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java
+++ 
b/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java
@@ -33,6 +33,8 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 
 import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -70,6 +72,7 @@ import com.sun.jersey.api.json.JSONConfiguration;
 import com.sun.jersey.core.util.MultivaluedMapImpl;
 
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"yandex", "translate", "translation", "language"})
 @CapabilityDescription("Translates content and attributes from one language to 
another")
 @WritesAttributes({

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore 
b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
 
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
index a78b112..e41b583 100644
--- 
a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
+++ 
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
@@ -32,6 +32,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Pattern;
 
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -57,8 +59,8 @@ import org.apache.nifi.processor.util.StandardValidators;
 import com.twitter.hbc.ClientBuilder;
 import com.twitter.hbc.core.Client;
 import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.Location.Coordinate ;
 import com.twitter.hbc.core.endpoint.Location ;
+import com.twitter.hbc.core.endpoint.Location.Coordinate ;
 import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
 import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
 import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
@@ -69,6 +71,7 @@ import com.twitter.hbc.httpclient.auth.Authentication;
 import com.twitter.hbc.httpclient.auth.OAuth1;
 
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"twitter", "tweets", "social media", "status", "json"})
 @CapabilityDescription("Pulls status changes from Twitter's streaming API")
 @WritesAttribute(attribute = "mime.type", description = "Sets mime type to 
application/json")

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
index ff264a1..a85aa0f 100644
--- 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
@@ -18,7 +18,29 @@
  */
 package org.apache.nifi.processors.solr;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
@@ -41,27 +63,8 @@ import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 @Tags({"Apache", "Solr", "Get", "Pull"})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Queries Solr and outputs the results as a FlowFile")
 public class GetSolr extends SolrProcessor {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
index 560ad34..df034c9 100644
--- 
a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
+++ 
b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java
@@ -18,7 +18,24 @@
  */
 package org.apache.nifi.processors.solr;
 
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -40,22 +57,8 @@ import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.MultiMapSolrParams;
 import org.apache.solr.common.util.ContentStreamBase;
 
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
 @Tags({"Apache", "Solr", "Put", "Send"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to 
Solr")
 @DynamicProperty(name="A Solr request parameter name", value="A Solr request 
parameter value",
         description="These parameters will be passed to Solr on the request")

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
index 9887e38..816b407 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
@@ -29,6 +29,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.codec.binary.Base64InputStream;
 import org.apache.commons.codec.binary.Base64OutputStream;
 import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -51,101 +53,102 @@ import org.apache.nifi.util.StopWatch;
 @SupportsBatching
 @Tags({"encode", "base64"})
 @CapabilityDescription("Encodes or decodes content to and from base64")
+@InputRequirement(Requirement.INPUT_REQUIRED)
 public class Base64EncodeContent extends AbstractProcessor {
 
-    public static final String ENCODE_MODE = "Encode";
-    public static final String DECODE_MODE = "Decode";
+       public static final String ENCODE_MODE = "Encode";
+       public static final String DECODE_MODE = "Decode";
 
-    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
-            .name("Mode")
-            .description("Specifies whether the content should be encoded or 
decoded")
-            .required(true)
-            .allowableValues(ENCODE_MODE, DECODE_MODE)
-            .defaultValue(ENCODE_MODE)
-            .build();
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Any FlowFile that is successfully encoded or decoded 
will be routed to success")
-            .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("Any FlowFile that cannot be encoded or decoded will 
be routed to failure")
-            .build();
+       public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+               .name("Mode")
+               .description("Specifies whether the content should be encoded 
or decoded")
+               .required(true)
+               .allowableValues(ENCODE_MODE, DECODE_MODE)
+               .defaultValue(ENCODE_MODE)
+               .build();
+       public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+               .name("success")
+               .description("Any FlowFile that is successfully encoded or 
decoded will be routed to success")
+               .build();
+       public static final Relationship REL_FAILURE = new 
Relationship.Builder()
+               .name("failure")
+               .description("Any FlowFile that cannot be encoded or decoded 
will be routed to failure")
+               .build();
 
-    private List<PropertyDescriptor> properties;
-    private Set<Relationship> relationships;
+       private List<PropertyDescriptor> properties;
+       private Set<Relationship> relationships;
 
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(MODE);
-        this.properties = Collections.unmodifiableList(properties);
+       @Override
+       protected void init(final ProcessorInitializationContext context) {
+               final List<PropertyDescriptor> properties = new ArrayList<>();
+               properties.add(MODE);
+               this.properties = Collections.unmodifiableList(properties);
 
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
-        this.relationships = Collections.unmodifiableSet(relationships);
-    }
+               final Set<Relationship> relationships = new HashSet<>();
+               relationships.add(REL_SUCCESS);
+               relationships.add(REL_FAILURE);
+               this.relationships = Collections.unmodifiableSet(relationships);
+       }
 
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
+       @Override
+       public Set<Relationship> getRelationships() {
+               return relationships;
+       }
 
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
+       @Override
+       protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+               return properties;
+       }
 
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
+       @Override
+       public void onTrigger(final ProcessContext context, final 
ProcessSession session) {
+               FlowFile flowFile = session.get();
+               if (flowFile == null) {
+                       return;
+               }
 
-        final ProcessorLog logger = getLogger();
+               final ProcessorLog logger = getLogger();
 
-        boolean encode = 
context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
-        try {
-            final StopWatch stopWatch = new StopWatch(true);
-            if (encode) {
-                flowFile = session.write(flowFile, new StreamCallback() {
-                    @Override
-                    public void process(InputStream in, OutputStream out) 
throws IOException {
-                        try (Base64OutputStream bos = new 
Base64OutputStream(out)) {
-                            int len = -1;
-                            byte[] buf = new byte[8192];
-                            while ((len = in.read(buf)) > 0) {
-                                bos.write(buf, 0, len);
-                            }
-                            bos.flush();
-                        }
-                    }
-                });
-            } else {
-                flowFile = session.write(flowFile, new StreamCallback() {
-                    @Override
-                    public void process(InputStream in, OutputStream out) 
throws IOException {
-                        try (Base64InputStream bis = new Base64InputStream(new 
ValidatingBase64InputStream(in))) {
-                            int len = -1;
-                            byte[] buf = new byte[8192];
-                            while ((len = bis.read(buf)) > 0) {
-                                out.write(buf, 0, len);
-                            }
-                            out.flush();
-                        }
-                    }
-                });
-            }
+               boolean encode = 
context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+               try {
+                       final StopWatch stopWatch = new StopWatch(true);
+                       if (encode) {
+                               flowFile = session.write(flowFile, new 
StreamCallback() {
+                                       @Override
+                                       public void process(InputStream in, 
OutputStream out) throws IOException {
+                                               try (Base64OutputStream bos = 
new Base64OutputStream(out)) {
+                                                       int len = -1;
+                                                       byte[] buf = new 
byte[8192];
+                                                       while ((len = 
in.read(buf)) > 0) {
+                                                               bos.write(buf, 
0, len);
+                                                       }
+                                                       bos.flush();
+                                               }
+                                       }
+                               });
+                       } else {
+                               flowFile = session.write(flowFile, new 
StreamCallback() {
+                                       @Override
+                                       public void process(InputStream in, 
OutputStream out) throws IOException {
+                                               try (Base64InputStream bis = 
new Base64InputStream(new ValidatingBase64InputStream(in))) {
+                                                       int len = -1;
+                                                       byte[] buf = new 
byte[8192];
+                                                       while ((len = 
bis.read(buf)) > 0) {
+                                                               out.write(buf, 
0, len);
+                                                       }
+                                                       out.flush();
+                                               }
+                                       }
+                               });
+                       }
 
-            logger.info("Successfully {} {}", new Object[]{encode ? "encoded" 
: "decoded", flowFile});
-            session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-            session.transfer(flowFile, REL_SUCCESS);
-        } catch (ProcessException e) {
-            logger.error("Failed to {} {} due to {}", new Object[]{encode ? 
"encode" : "decode", flowFile, e});
-            session.transfer(flowFile, REL_FAILURE);
-        }
-    }
+                       logger.info("Successfully {} {}", new Object[]{encode ? 
"encoded" : "decoded", flowFile});
+                       session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                       session.transfer(flowFile, REL_SUCCESS);
+               } catch (ProcessException e) {
+                       logger.error("Failed to {} {} due to {}", new 
Object[]{encode ? "encode" : "decode", flowFile, e});
+                       session.transfer(flowFile, REL_FAILURE);
+               }
+       }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index 1b9b20c..593cf44 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@ -29,20 +29,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import lzma.sdk.lzma.Decoder;
-import lzma.streams.LzmaInputStream;
-import lzma.streams.LzmaOutputStream;
-
 import org.apache.commons.compress.compressors.CompressorStreamFactory;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
 import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -63,9 +61,14 @@ import org.tukaani.xz.LZMA2Options;
 import org.tukaani.xz.XZInputStream;
 import org.tukaani.xz.XZOutputStream;
 
+import lzma.sdk.lzma.Decoder;
+import lzma.streams.LzmaInputStream;
+import lzma.streams.LzmaOutputStream;
+
 @EventDriven
 @SideEffectFree
 @SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", 
"xz-lzma2"})
 @CapabilityDescription("Compresses or decompresses the contents of FlowFiles 
using a user-specified compression algorithm and updates the mime.type "
     + "attribute as appropriate")

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index 2efc852..a45c211 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -31,6 +31,12 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -43,10 +49,6 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.timebuffer.EntityAccess;
@@ -54,344 +56,345 @@ import org.apache.nifi.util.timebuffer.TimedBuffer;
 
 @SideEffectFree
 @TriggerSerially
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"rate control", "throttle", "rate", "throughput"})
 @CapabilityDescription("Controls the rate at which data is transferred to 
follow-on processors.")
 public class ControlRate extends AbstractProcessor {
 
-    public static final String DATA_RATE = "data rate";
-    public static final String FLOWFILE_RATE = "flowfile count";
-    public static final String ATTRIBUTE_RATE = "attribute value";
-
-    public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new 
PropertyDescriptor.Builder()
-            .name("Rate Control Criteria")
-            .description("Indicates the criteria that is used to control the 
throughput rate. Changing this value resets the rate counters.")
-            .required(true)
-            .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE)
-            .defaultValue(DATA_RATE)
-            .build();
-    public static final PropertyDescriptor MAX_RATE = new 
PropertyDescriptor.Builder()
-            .name("Maximum Rate")
-            .description("The maximum rate at which data should pass through 
this processor. The format of this property is expected to be a "
-                    + "positive integer, or a Data Size (such as '1 MB') if 
Rate Control Criteria is set to 'data rate'.")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated 
in customValidate b/c dependent on Rate Control Criteria
-            .build();
-    public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new 
PropertyDescriptor.Builder()
-            .name("Rate Controlled Attribute")
-            .description("The name of an attribute whose values build toward 
the rate limit if Rate Control Criteria is set to 'attribute value'. "
-                    + "The value of the attribute referenced by this property 
must be a positive long, or the FlowFile will be routed to failure. "
-                    + "This value is ignored if Rate Control Criteria is not 
set to 'attribute value'. Changing this value resets the rate counters.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .build();
-    public static final PropertyDescriptor TIME_PERIOD = new 
PropertyDescriptor.Builder()
-            .name("Time Duration")
-            .description("The amount of time to which the Maximum Data Size 
and Maximum Number of Files pertains. Changing this value resets the rate 
counters.")
-            .required(true)
-            .addValidator(StandardValidators.createTimePeriodValidator(1, 
TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
-            .defaultValue("1 min")
-            .build();
-    public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new 
PropertyDescriptor.Builder()
-            .name("Grouping Attribute")
-            .description("By default, a single \"throttle\" is used for all 
FlowFiles. If this value is specified, a separate throttle is used for "
-                    + "each value specified by the attribute with this name. 
Changing this value resets the rate counters.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .build();
-
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("All FlowFiles are transferred to this relationship")
-            .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("FlowFiles will be routed to this relationship if 
they are missing a necessary attribute or the attribute is not in the expected 
format")
-            .build();
-
-    private static final Pattern POSITIVE_LONG_PATTERN = 
Pattern.compile("0*[1-9][0-9]*");
-    private static final String DEFAULT_GROUP_ATTRIBUTE = 
ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
-
-    private final ConcurrentMap<String, Throttle> throttleMap = new 
ConcurrentHashMap<>();
-    private List<PropertyDescriptor> properties;
-    private Set<Relationship> relationships;
-    private final AtomicLong lastThrottleClearTime = new 
AtomicLong(System.currentTimeMillis());
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(RATE_CONTROL_CRITERIA);
-        properties.add(MAX_RATE);
-        properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
-        properties.add(TIME_PERIOD);
-        properties.add(GROUPING_ATTRIBUTE_NAME);
-        this.properties = Collections.unmodifiableList(properties);
-
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        this.relationships = Collections.unmodifiableSet(relationships);
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(final 
ValidationContext context) {
-        final List<ValidationResult> validationResults = new 
ArrayList<>(super.customValidate(context));
-
-        final Validator rateValidator;
-        switch 
(context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
-            case DATA_RATE:
-                rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
-                break;
-            case ATTRIBUTE_RATE:
-                rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
-                final String rateAttr = 
context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
-                if (rateAttr == null) {
-                    validationResults.add(new ValidationResult.Builder()
-                            .subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
-                            .explanation("<Rate Controlled Attribute> property 
must be set if using <Rate Control Criteria> of 'attribute value'")
-                            .build());
-                }
-                break;
-            case FLOWFILE_RATE:
-            default:
-                rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
-                break;
-        }
-
-        final ValidationResult rateResult = rateValidator.validate("Maximum 
Rate", context.getProperty(MAX_RATE).getValue(), context);
-        if (!rateResult.isValid()) {
-            validationResults.add(rateResult);
-        }
-
-        return validationResults;
-    }
-
-    @Override
-    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
-        super.onPropertyModified(descriptor, oldValue, newValue);
-
-        if (descriptor.equals(RATE_CONTROL_CRITERIA)
-                || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
-                || descriptor.equals(GROUPING_ATTRIBUTE_NAME)
-                || descriptor.equals(TIME_PERIOD)) {
-            // if the criteria that is being used to determine 
limits/throttles is changed, we must clear our throttle map.
-            throttleMap.clear();
-        } else if (descriptor.equals(MAX_RATE)) {
-            final long newRate;
-            if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
-                newRate = DataUnit.parseDataSize(newValue, 
DataUnit.B).longValue();
-            } else {
-                newRate = Long.parseLong(newValue);
-            }
-
-            for (final Throttle throttle : throttleMap.values()) {
-                throttle.setMaxRate(newRate);
-            }
-        }
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final long lastClearTime = lastThrottleClearTime.get();
-        final long throttleExpirationMillis = System.currentTimeMillis() - 2 * 
context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
-        if (lastClearTime < throttleExpirationMillis) {
-            if (lastThrottleClearTime.compareAndSet(lastClearTime, 
System.currentTimeMillis())) {
-                final Iterator<Map.Entry<String, Throttle>> itr = 
throttleMap.entrySet().iterator();
-                while (itr.hasNext()) {
-                    final Map.Entry<String, Throttle> entry = itr.next();
-                    final Throttle throttle = entry.getValue();
-                    if (throttle.tryLock()) {
-                        try {
-                            if (throttle.lastUpdateTime() < lastClearTime) {
-                                itr.remove();
-                            }
-                        } finally {
-                            throttle.unlock();
-                        }
-                    }
-                }
-            }
-        }
-
-        // TODO: Should periodically clear any Throttle that has not been used 
in more than 2 throttling periods
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
-        final ProcessorLog logger = getLogger();
-        final long seconds = 
context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
-        final String rateControlAttributeName = 
context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
-        long rateValue;
-        switch 
(context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
-            case DATA_RATE:
-                rateValue = flowFile.getSize();
-                break;
-            case FLOWFILE_RATE:
-                rateValue = 1;
-                break;
-            case ATTRIBUTE_RATE:
-                final String attributeValue = 
flowFile.getAttribute(rateControlAttributeName);
-                if (attributeValue == null) {
-                    logger.error("routing {} to 'failure' because FlowFile is 
missing required attribute {}", new Object[]{flowFile, 
rateControlAttributeName});
-                    session.transfer(flowFile, REL_FAILURE);
-                    return;
-                }
-
-                if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
-                    logger.error("routing {} to 'failure' because FlowFile 
attribute {} has a value of {}, which is not a positive long",
-                            new Object[]{flowFile, rateControlAttributeName, 
attributeValue});
-                    session.transfer(flowFile, REL_FAILURE);
-                    return;
-                }
-                rateValue = Long.parseLong(attributeValue);
-                break;
-            default:
-                throw new AssertionError("<Rate Control Criteria> property set 
to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue());
-        }
-
-        final String groupingAttributeName = 
context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
-        final String groupName = (groupingAttributeName == null) ? 
DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
-        Throttle throttle = throttleMap.get(groupName);
-        if (throttle == null) {
-            throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger);
-
-            final String maxRateValue = 
context.getProperty(MAX_RATE).getValue();
-            final long newRate;
-            if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
-                newRate = DataUnit.parseDataSize(maxRateValue, 
DataUnit.B).longValue();
-            } else {
-                newRate = Long.parseLong(maxRateValue);
-            }
-            throttle.setMaxRate(newRate);
-
-            throttleMap.put(groupName, throttle);
-        }
-
-        throttle.lock();
-        try {
-            if (throttle.tryAdd(rateValue)) {
-                logger.info("transferring {} to 'success'", new 
Object[]{flowFile});
-                session.transfer(flowFile, REL_SUCCESS);
-            } else {
-                flowFile = session.penalize(flowFile);
-                session.transfer(flowFile);
-            }
-        } finally {
-            throttle.unlock();
-        }
-    }
-
-    private static class TimestampedLong {
-
-        private final Long value;
-        private final long timestamp = System.currentTimeMillis();
-
-        public TimestampedLong(final Long value) {
-            this.value = value;
-        }
-
-        public Long getValue() {
-            return value;
-        }
-
-        public long getTimestamp() {
-            return timestamp;
-        }
-    }
-
-    private static class RateEntityAccess implements 
EntityAccess<TimestampedLong> {
-
-        @Override
-        public TimestampedLong aggregate(TimestampedLong oldValue, 
TimestampedLong toAdd) {
-            if (oldValue == null && toAdd == null) {
-                return new TimestampedLong(0L);
-            } else if (oldValue == null) {
-                return toAdd;
-            } else if (toAdd == null) {
-                return oldValue;
-            }
-
-            return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
-        }
-
-        @Override
-        public TimestampedLong createNew() {
-            return new TimestampedLong(0L);
-        }
-
-        @Override
-        public long getTimestamp(TimestampedLong entity) {
-            return entity == null ? 0L : entity.getTimestamp();
-        }
-    }
-
-    private static class Throttle extends ReentrantLock {
-
-        private final AtomicLong maxRate = new AtomicLong(1L);
-        private final long timePeriodValue;
-        private final TimeUnit timePeriodUnit;
-        private final TimedBuffer<TimestampedLong> timedBuffer;
-        private final ProcessorLog logger;
-
-        private volatile long penalizationExpired;
-        private volatile long lastUpdateTime;
-
-        public Throttle(final int timePeriod, final TimeUnit unit, final 
ProcessorLog logger) {
-            this.timePeriodUnit = unit;
-            this.timePeriodValue = timePeriod;
-            this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new 
RateEntityAccess());
-            this.logger = logger;
-        }
-
-        public void setMaxRate(final long maxRate) {
-            this.maxRate.set(maxRate);
-        }
-
-        public long lastUpdateTime() {
-            return lastUpdateTime;
-        }
-
-        public boolean tryAdd(final long value) {
-            final long now = System.currentTimeMillis();
-            if (penalizationExpired > now) {
-                return false;
-            }
-
-            final long maxRateValue = maxRate.get();
-
-            final TimestampedLong sum = 
timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, 
timePeriodUnit));
-            if (sum != null && sum.getValue() >= maxRateValue) {
-                logger.debug("current sum for throttle is {}, so not allowing 
rate of {} through", new Object[]{sum.getValue(), value});
-                return false;
-            }
-
-            logger.debug("current sum for throttle is {}, so allowing rate of 
{} through",
-                    new Object[]{sum == null ? 0 : sum.getValue(), value});
-
-            final long transferred = timedBuffer.add(new 
TimestampedLong(value)).getValue();
-            if (transferred > maxRateValue) {
-                final long amountOver = transferred - maxRateValue;
-                // determine how long it should take to transfer 'amountOver' 
and 'penalize' the Throttle for that long
-                final long milliDuration = 
TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
-                final double pct = (double) amountOver / (double) maxRateValue;
-                final long penalizationPeriod = (long) (milliDuration * pct);
-                this.penalizationExpired = now + penalizationPeriod;
-                logger.debug("allowing rate of {} through but penalizing 
Throttle for {} milliseconds", new Object[]{value, penalizationPeriod});
-            }
-
-            lastUpdateTime = now;
-            return true;
-        }
-    }
+       public static final String DATA_RATE = "data rate";
+       public static final String FLOWFILE_RATE = "flowfile count";
+       public static final String ATTRIBUTE_RATE = "attribute value";
+
+       public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new 
PropertyDescriptor.Builder()
+               .name("Rate Control Criteria")
+               .description("Indicates the criteria that is used to control 
the throughput rate. Changing this value resets the rate counters.")
+               .required(true)
+               .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE)
+               .defaultValue(DATA_RATE)
+               .build();
+       public static final PropertyDescriptor MAX_RATE = new 
PropertyDescriptor.Builder()
+               .name("Maximum Rate")
+               .description("The maximum rate at which data should pass 
through this processor. The format of this property is expected to be a "
+                       + "positive integer, or a Data Size (such as '1 MB') if 
Rate Control Criteria is set to 'data rate'.")
+               .required(true)
+               .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // 
validated in customValidate b/c dependent on Rate Control Criteria
+               .build();
+       public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = 
new PropertyDescriptor.Builder()
+               .name("Rate Controlled Attribute")
+               .description("The name of an attribute whose values build 
toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
+                       + "The value of the attribute referenced by this 
property must be a positive long, or the FlowFile will be routed to failure. "
+                       + "This value is ignored if Rate Control Criteria is 
not set to 'attribute value'. Changing this value resets the rate counters.")
+               .required(false)
+               .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+               .expressionLanguageSupported(false)
+               .build();
+       public static final PropertyDescriptor TIME_PERIOD = new 
PropertyDescriptor.Builder()
+               .name("Time Duration")
+               .description("The amount of time to which the Maximum Data Size 
and Maximum Number of Files pertains. Changing this value resets the rate 
counters.")
+               .required(true)
+               .addValidator(StandardValidators.createTimePeriodValidator(1, 
TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+               .defaultValue("1 min")
+               .build();
+       public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new 
PropertyDescriptor.Builder()
+               .name("Grouping Attribute")
+               .description("By default, a single \"throttle\" is used for all 
FlowFiles. If this value is specified, a separate throttle is used for "
+                       + "each value specified by the attribute with this 
name. Changing this value resets the rate counters.")
+               .required(false)
+               .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+               .expressionLanguageSupported(false)
+               .build();
+
+       public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+               .name("success")
+               .description("All FlowFiles are transferred to this 
relationship")
+               .build();
+       public static final Relationship REL_FAILURE = new 
Relationship.Builder()
+               .name("failure")
+               .description("FlowFiles will be routed to this relationship if 
they are missing a necessary attribute or the attribute is not in the expected 
format")
+               .build();
+
+       private static final Pattern POSITIVE_LONG_PATTERN = 
Pattern.compile("0*[1-9][0-9]*");
+       private static final String DEFAULT_GROUP_ATTRIBUTE = 
ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
+
+       private final ConcurrentMap<String, Throttle> throttleMap = new 
ConcurrentHashMap<>();
+       private List<PropertyDescriptor> properties;
+       private Set<Relationship> relationships;
+       private final AtomicLong lastThrottleClearTime = new 
AtomicLong(System.currentTimeMillis());
+
+       @Override
+       protected void init(final ProcessorInitializationContext context) {
+               final List<PropertyDescriptor> properties = new ArrayList<>();
+               properties.add(RATE_CONTROL_CRITERIA);
+               properties.add(MAX_RATE);
+               properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
+               properties.add(TIME_PERIOD);
+               properties.add(GROUPING_ATTRIBUTE_NAME);
+               this.properties = Collections.unmodifiableList(properties);
+
+               final Set<Relationship> relationships = new HashSet<>();
+               relationships.add(REL_SUCCESS);
+               this.relationships = Collections.unmodifiableSet(relationships);
+       }
+
+       @Override
+       protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+               return properties;
+       }
+
+       @Override
+       public Set<Relationship> getRelationships() {
+               return relationships;
+       }
+
+       @Override
+       protected Collection<ValidationResult> customValidate(final 
ValidationContext context) {
+               final List<ValidationResult> validationResults = new 
ArrayList<>(super.customValidate(context));
+
+               final Validator rateValidator;
+               switch 
(context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+                       case DATA_RATE:
+                               rateValidator = 
StandardValidators.DATA_SIZE_VALIDATOR;
+                               break;
+                       case ATTRIBUTE_RATE:
+                               rateValidator = 
StandardValidators.POSITIVE_LONG_VALIDATOR;
+                               final String rateAttr = 
context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
+                               if (rateAttr == null) {
+                                       validationResults.add(new 
ValidationResult.Builder()
+                                               
.subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
+                                               .explanation("<Rate Controlled 
Attribute> property must be set if using <Rate Control Criteria> of 'attribute 
value'")
+                                               .build());
+                               }
+                               break;
+                       case FLOWFILE_RATE:
+                       default:
+                               rateValidator = 
StandardValidators.POSITIVE_LONG_VALIDATOR;
+                               break;
+               }
+
+               final ValidationResult rateResult = 
rateValidator.validate("Maximum Rate", 
context.getProperty(MAX_RATE).getValue(), context);
+               if (!rateResult.isValid()) {
+                       validationResults.add(rateResult);
+               }
+
+               return validationResults;
+       }
+
+       @Override
+       public void onPropertyModified(final PropertyDescriptor descriptor, 
final String oldValue, final String newValue) {
+               super.onPropertyModified(descriptor, oldValue, newValue);
+
+               if (descriptor.equals(RATE_CONTROL_CRITERIA)
+                       || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
+                       || descriptor.equals(GROUPING_ATTRIBUTE_NAME)
+                       || descriptor.equals(TIME_PERIOD)) {
+                       // if the criteria that is being used to determine 
limits/throttles is changed, we must clear our throttle map.
+                       throttleMap.clear();
+               } else if (descriptor.equals(MAX_RATE)) {
+                       final long newRate;
+                       if 
(DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
+                               newRate = DataUnit.parseDataSize(newValue, 
DataUnit.B).longValue();
+                       } else {
+                               newRate = Long.parseLong(newValue);
+                       }
+
+                       for (final Throttle throttle : throttleMap.values()) {
+                               throttle.setMaxRate(newRate);
+                       }
+               }
+       }
+
+       @Override
+       public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+               final long lastClearTime = lastThrottleClearTime.get();
+               final long throttleExpirationMillis = 
System.currentTimeMillis() - 2 * 
context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+               if (lastClearTime < throttleExpirationMillis) {
+                       if (lastThrottleClearTime.compareAndSet(lastClearTime, 
System.currentTimeMillis())) {
+                               final Iterator<Map.Entry<String, Throttle>> itr 
= throttleMap.entrySet().iterator();
+                               while (itr.hasNext()) {
+                                       final Map.Entry<String, Throttle> entry 
= itr.next();
+                                       final Throttle throttle = 
entry.getValue();
+                                       if (throttle.tryLock()) {
+                                               try {
+                                                       if 
(throttle.lastUpdateTime() < lastClearTime) {
+                                                               itr.remove();
+                                                       }
+                                               } finally {
+                                                       throttle.unlock();
+                                               }
+                                       }
+                               }
+                       }
+               }
+
+               // TODO: Should periodically clear any Throttle that has not 
been used in more than 2 throttling periods
+               FlowFile flowFile = session.get();
+               if (flowFile == null) {
+                       return;
+               }
+
+               final ProcessorLog logger = getLogger();
+               final long seconds = 
context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
+               final String rateControlAttributeName = 
context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
+               long rateValue;
+               switch 
(context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+                       case DATA_RATE:
+                               rateValue = flowFile.getSize();
+                               break;
+                       case FLOWFILE_RATE:
+                               rateValue = 1;
+                               break;
+                       case ATTRIBUTE_RATE:
+                               final String attributeValue = 
flowFile.getAttribute(rateControlAttributeName);
+                               if (attributeValue == null) {
+                                       logger.error("routing {} to 'failure' 
because FlowFile is missing required attribute {}", new Object[]{flowFile, 
rateControlAttributeName});
+                                       session.transfer(flowFile, REL_FAILURE);
+                                       return;
+                               }
+
+                               if 
(!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
+                                       logger.error("routing {} to 'failure' 
because FlowFile attribute {} has a value of {}, which is not a positive long",
+                                               new Object[]{flowFile, 
rateControlAttributeName, attributeValue});
+                                       session.transfer(flowFile, REL_FAILURE);
+                                       return;
+                               }
+                               rateValue = Long.parseLong(attributeValue);
+                               break;
+                       default:
+                               throw new AssertionError("<Rate Control 
Criteria> property set to illegal value of " + 
context.getProperty(RATE_CONTROL_CRITERIA).getValue());
+               }
+
+               final String groupingAttributeName = 
context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
+               final String groupName = (groupingAttributeName == null) ? 
DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
+               Throttle throttle = throttleMap.get(groupName);
+               if (throttle == null) {
+                       throttle = new Throttle((int) seconds, 
TimeUnit.SECONDS, logger);
+
+                       final String maxRateValue = 
context.getProperty(MAX_RATE).getValue();
+                       final long newRate;
+                       if 
(DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
+                               newRate = DataUnit.parseDataSize(maxRateValue, 
DataUnit.B).longValue();
+                       } else {
+                               newRate = Long.parseLong(maxRateValue);
+                       }
+                       throttle.setMaxRate(newRate);
+
+                       throttleMap.put(groupName, throttle);
+               }
+
+               throttle.lock();
+               try {
+                       if (throttle.tryAdd(rateValue)) {
+                               logger.info("transferring {} to 'success'", new 
Object[]{flowFile});
+                               session.transfer(flowFile, REL_SUCCESS);
+                       } else {
+                               flowFile = session.penalize(flowFile);
+                               session.transfer(flowFile);
+                       }
+               } finally {
+                       throttle.unlock();
+               }
+       }
+
+       private static class TimestampedLong {
+
+               private final Long value;
+               private final long timestamp = System.currentTimeMillis();
+
+               public TimestampedLong(final Long value) {
+                       this.value = value;
+               }
+
+               public Long getValue() {
+                       return value;
+               }
+
+               public long getTimestamp() {
+                       return timestamp;
+               }
+       }
+
+       private static class RateEntityAccess implements 
EntityAccess<TimestampedLong> {
+
+               @Override
+               public TimestampedLong aggregate(TimestampedLong oldValue, 
TimestampedLong toAdd) {
+                       if (oldValue == null && toAdd == null) {
+                               return new TimestampedLong(0L);
+                       } else if (oldValue == null) {
+                               return toAdd;
+                       } else if (toAdd == null) {
+                               return oldValue;
+                       }
+
+                       return new TimestampedLong(oldValue.getValue() + 
toAdd.getValue());
+               }
+
+               @Override
+               public TimestampedLong createNew() {
+                       return new TimestampedLong(0L);
+               }
+
+               @Override
+               public long getTimestamp(TimestampedLong entity) {
+                       return entity == null ? 0L : entity.getTimestamp();
+               }
+       }
+
+       private static class Throttle extends ReentrantLock {
+
+               private final AtomicLong maxRate = new AtomicLong(1L);
+               private final long timePeriodValue;
+               private final TimeUnit timePeriodUnit;
+               private final TimedBuffer<TimestampedLong> timedBuffer;
+               private final ProcessorLog logger;
+
+               private volatile long penalizationExpired;
+               private volatile long lastUpdateTime;
+
+               public Throttle(final int timePeriod, final TimeUnit unit, 
final ProcessorLog logger) {
+                       this.timePeriodUnit = unit;
+                       this.timePeriodValue = timePeriod;
+                       this.timedBuffer = new TimedBuffer<>(unit, timePeriod, 
new RateEntityAccess());
+                       this.logger = logger;
+               }
+
+               public void setMaxRate(final long maxRate) {
+                       this.maxRate.set(maxRate);
+               }
+
+               public long lastUpdateTime() {
+                       return lastUpdateTime;
+               }
+
+               public boolean tryAdd(final long value) {
+                       final long now = System.currentTimeMillis();
+                       if (penalizationExpired > now) {
+                               return false;
+                       }
+
+                       final long maxRateValue = maxRate.get();
+
+                       final TimestampedLong sum = 
timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, 
timePeriodUnit));
+                       if (sum != null && sum.getValue() >= maxRateValue) {
+                               logger.debug("current sum for throttle is {}, 
so not allowing rate of {} through", new Object[]{sum.getValue(), value});
+                               return false;
+                       }
+
+                       logger.debug("current sum for throttle is {}, so 
allowing rate of {} through",
+                               new Object[]{sum == null ? 0 : sum.getValue(), 
value});
+
+                       final long transferred = timedBuffer.add(new 
TimestampedLong(value)).getValue();
+                       if (transferred > maxRateValue) {
+                               final long amountOver = transferred - 
maxRateValue;
+                               // determine how long it should take to 
transfer 'amountOver' and 'penalize' the Throttle for that long
+                               final long milliDuration = 
TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
+                               final double pct = (double) amountOver / 
(double) maxRateValue;
+                               final long penalizationPeriod = (long) 
(milliDuration * pct);
+                               this.penalizationExpired = now + 
penalizationPeriod;
+                               logger.debug("allowing rate of {} through but 
penalizing Throttle for {} milliseconds", new Object[]{value, 
penalizationPeriod});
+                       }
+
+                       lastUpdateTime = now;
+                       return true;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
index a0a1364..7a99a59 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
@@ -33,8 +33,10 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.StreamCallback;
@@ -76,6 +78,7 @@ import java.util.concurrent.TimeUnit;
  */
 @EventDriven
 @SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @SupportsBatching
 @Tags({"text", "convert", "characterset", "character set"})
 @CapabilityDescription("Converts a FlowFile's content from one character set 
to another")

http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 7eda593..9591960 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -34,10 +34,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -64,6 +66,7 @@ import org.codehaus.jackson.node.JsonNodeFactory;
 @SideEffectFree
 @SupportsBatching
 @SeeAlso(PutSQL.class)
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", 
"flat"})
 @CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or 
INSERT SQL statement. The incoming FlowFile is expected to be "
         + "\"flat\" JSON message, meaning that it consists of a single JSON 
element and each field maps to a simple type. If a field maps to "

Reply via email to