http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java index cbcc54d..385ac73 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java @@ -23,7 +23,8 @@ import java.util.List; import java.util.Set; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; @@ -57,6 +58,7 @@ import org.apache.nifi.processors.hadoop.util.SequenceFileWriter; * */ @SideEffectFree +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"hadoop", "sequence file", "create", "sequencefile"}) @CapabilityDescription("Creates Hadoop Sequence Files from incoming flow files") @SeeAlso(PutHDFS.class)
http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index 4a52fb7..aa03e73 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -29,6 +29,8 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.AccessControlException; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -44,6 +46,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; @SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"}) @CapabilityDescription("Retrieves a file from HDFS. The content of the incoming FlowFile is replaced by the content of the file in HDFS. " + "The file in HDFS is left intact without any changes being made to it.") http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index de776d4..4c9deea 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -41,6 +41,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; @@ -62,6 +64,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; @TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_ALLOWED) @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"}) @CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles. This Processor will delete the file from HDFS after fetching it.") @WritesAttributes({ http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 151cbf2..563bda8 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -36,6 +36,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -66,6 +68,7 @@ import org.codehaus.jackson.map.ObjectMapper; @TriggerSerially @TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"}) @CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, creates a FlowFile that represents " + "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only " http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 901159b..bedf1b9 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -33,6 +33,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.ipc.RemoteException; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; @@ -59,6 +61,7 @@ import org.apache.nifi.util.StopWatch; /** * This processor copies FlowFiles to HDFS. */ +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"hadoop", "HDFS", "put", "copy", "filesystem"}) @CapabilityDescription("Write FlowFile data to Hadoop Distributed File System (HDFS)") @WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.") http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java index 574fb2d..3a6ac79 100644 --- a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java +++ b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -57,6 +59,7 @@ import ca.uhn.hl7v2.validation.impl.ValidationContextFactory; @SideEffectFree @SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"HL7", "health level 7", "healthcare", "extract", "attributes"}) @CapabilityDescription("Extracts information from an HL7 (Health Level 7) formatted FlowFile and adds the information as FlowFile Attributes. " + "The attributes are named as <Segment Name> <dot> <Field Index>. If the segment is repeating, the naming will be " http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java index 53e7e69..26e8bb6 100644 --- a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java +++ b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java @@ -29,6 +29,8 @@ import java.util.Set; import org.apache.nifi.annotation.behavior.DynamicProperties; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -63,6 +65,7 @@ import ca.uhn.hl7v2.validation.impl.ValidationContextFactory; @EventDriven @SideEffectFree @SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"HL7", "healthcare", "route", "Health Level 7"}) @DynamicProperties({ @DynamicProperty(name = "Name of a Relationship", value = "An HL7 Query Language query", http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java index 7fe6195..b44eccd 100644 --- a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java +++ b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ExtractImageMetadata.java @@ -16,11 +16,18 @@ */ package org.apache.nifi.processors.image; -import com.drew.imaging.ImageMetadataReader; -import com.drew.imaging.ImageProcessingException; -import com.drew.metadata.Directory; -import com.drew.metadata.Metadata; -import com.drew.metadata.Tag; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; @@ -30,25 +37,22 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.ObjectHolder; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.HashSet; -import java.util.Map; -import java.util.HashMap; +import com.drew.imaging.ImageMetadataReader; +import com.drew.imaging.ImageProcessingException; +import com.drew.metadata.Directory; +import com.drew.metadata.Metadata; +import com.drew.metadata.Tag; +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Exif", "Exchangeable", "image", "file", "format", "JPG", "GIF", "PNG", "BMP", "metadata","IPTC", "XMP"}) @CapabilityDescription("Extract the image metadata from flowfiles containing images. This processor relies on this " + "metadata extractor library https://github.com/drewnoakes/metadata-extractor. It extracts a long list of " http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java index c085b5f..176561f 100644 --- a/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java +++ b/nifi-nar-bundles/nifi-image-bundle/nifi-image-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java @@ -17,7 +17,27 @@ package org.apache.nifi.processors.image; +import java.awt.Graphics2D; +import java.awt.Image; +import java.awt.image.BufferedImage; +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import javax.imageio.ImageIO; +import javax.imageio.ImageReader; +import javax.imageio.stream.ImageInputStream; + import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -33,25 +53,9 @@ import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; -import javax.imageio.ImageIO; -import javax.imageio.ImageReader; -import javax.imageio.stream.ImageInputStream; -import java.awt.Image; -import java.awt.Graphics2D; -import java.awt.image.BufferedImage; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Set; -import java.util.HashSet; -import java.util.List; -import java.util.Iterator; -import java.util.concurrent.TimeUnit; - @EventDriven @SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({ "resize", "image", "jpg", "jpeg", "png", "bmp", "wbmp", "gif" }) @CapabilityDescription("Resizes an image to user-specified dimensions. This Processor uses the image codecs registered with the " + "environment that NiFi is running in. By default, this includes JPEG, PNG, BMP, WBMP, and GIF images.") http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index 26590df..e10977b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -32,18 +32,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; - +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; @@ -58,7 +53,15 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.MessageAndMetadata; + @SupportsBatching +@InputRequirement(Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Fetches messages from Apache Kafka") @Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"}) @WritesAttributes({ http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index d83c7bf..cff285c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -30,10 +30,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; - +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -58,9 +56,13 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; import org.apache.nifi.util.LongHolder; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; import scala.actors.threadpool.Arrays; @SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka") public class PutKafka extends AbstractProcessor { http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java index 6c20a8f..6f126aa 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java @@ -18,18 +18,20 @@ */ package org.apache.nifi.processors.kite; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; +import static org.apache.nifi.processor.util.StandardValidators.createLongValidator; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.List; import java.util.Set; + import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData.Record; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -53,11 +55,13 @@ import org.kitesdk.data.spi.DefaultConfiguration; import org.kitesdk.data.spi.filesystem.CSVFileReader; import org.kitesdk.data.spi.filesystem.CSVProperties; -import static org.apache.nifi.processor.util.StandardValidators.createLongValidator; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; @Tags({"kite", "csv", "avro"}) -@CapabilityDescription( - "Converts CSV files to Avro according to an Avro Schema") +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Converts CSV files to Avro according to an Avro Schema") public class ConvertCSVToAvro extends AbstractKiteProcessor { private static final CSVProperties DEFAULTS = new CSVProperties.Builder().build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java index ec1503c..af120bf 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java @@ -18,18 +18,18 @@ */ package org.apache.nifi.processors.kite; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.List; import java.util.Set; + import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData.Record; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; @@ -47,9 +47,13 @@ import org.kitesdk.data.SchemaNotFoundException; import org.kitesdk.data.spi.DefaultConfiguration; import org.kitesdk.data.spi.filesystem.JSONFileReader; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + @Tags({"kite", "json", "avro"}) -@CapabilityDescription( - "Converts JSON files to Avro according to an Avro Schema") +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Converts JSON files to Avro according to an Avro Schema") public class ConvertJSONToAvro extends AbstractKiteProcessor { private static final Relationship SUCCESS = new Relationship.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java index 7a30db1..1986f0b 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java @@ -18,16 +18,17 @@ */ package org.apache.nifi.processors.kite; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; + import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData.Record; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; @@ -46,6 +47,10 @@ import org.kitesdk.data.ValidationException; import org.kitesdk.data.View; import org.kitesdk.data.spi.SchemaValidationUtil; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"kite", "avro", "parquet", "hadoop", "hive", "hdfs", "hbase"}) @CapabilityDescription("Stores Avro records in a Kite dataset") public class StoreInKiteDataset extends AbstractKiteProcessor { http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java b/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java index 8398152..5f58781 100644 --- a/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java +++ b/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java @@ -33,6 +33,8 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; @@ -70,6 +72,7 @@ import com.sun.jersey.api.json.JSONConfiguration; import com.sun.jersey.core.util.MultivaluedMapImpl; @SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"yandex", "translate", "translation", "language"}) @CapabilityDescription("Translates content and attributes from one language to another") @WritesAttributes({ http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/.gitignore @@ -0,0 +1 @@ +/bin/ http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java index a78b112..e41b583 100644 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java @@ -32,6 +32,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Pattern; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -57,8 +59,8 @@ import org.apache.nifi.processor.util.StandardValidators; import com.twitter.hbc.ClientBuilder; import com.twitter.hbc.core.Client; import com.twitter.hbc.core.Constants; -import com.twitter.hbc.core.endpoint.Location.Coordinate ; import com.twitter.hbc.core.endpoint.Location ; +import com.twitter.hbc.core.endpoint.Location.Coordinate ; import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint; import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint; @@ -69,6 +71,7 @@ import com.twitter.hbc.httpclient.auth.Authentication; import com.twitter.hbc.httpclient.auth.OAuth1; @SupportsBatching +@InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"twitter", "tweets", "social media", "status", "json"}) @CapabilityDescription("Pulls status changes from Twitter's streaming API") @WritesAttribute(attribute = "mime.type", description = "Sets mime type to application/json") http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java index ff264a1..a85aa0f 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java @@ -18,7 +18,29 @@ */ package org.apache.nifi.processors.solr; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.Set; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnRemoved; @@ -41,27 +63,8 @@ import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Properties; -import java.util.Set; -import java.util.TimeZone; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - @Tags({"Apache", "Solr", "Get", "Pull"}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Queries Solr and outputs the results as a FlowFile") public class GetSolr extends SolrProcessor { http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java index 560ad34..df034c9 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java @@ -18,7 +18,24 @@ */ package org.apache.nifi.processors.solr; +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; @@ -40,22 +57,8 @@ import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.MultiMapSolrParams; import org.apache.solr.common.util.ContentStreamBase; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.TimeUnit; - @Tags({"Apache", "Solr", "Put", "Send"}) +@InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Sends the contents of a FlowFile as a ContentStream to Solr") @DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value", description="These parameters will be passed to Solr on the request") http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java index 9887e38..816b407 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java @@ -29,6 +29,8 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.codec.binary.Base64InputStream; import org.apache.commons.codec.binary.Base64OutputStream; import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -51,101 +53,102 @@ import org.apache.nifi.util.StopWatch; @SupportsBatching @Tags({"encode", "base64"}) @CapabilityDescription("Encodes or decodes content to and from base64") +@InputRequirement(Requirement.INPUT_REQUIRED) public class Base64EncodeContent extends AbstractProcessor { - public static final String ENCODE_MODE = "Encode"; - public static final String DECODE_MODE = "Decode"; + public static final String ENCODE_MODE = "Encode"; + public static final String DECODE_MODE = "Decode"; - public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() - .name("Mode") - .description("Specifies whether the content should be encoded or decoded") - .required(true) - .allowableValues(ENCODE_MODE, DECODE_MODE) - .defaultValue(ENCODE_MODE) - .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Any FlowFile that is successfully encoded or decoded will be routed to success") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Any FlowFile that cannot be encoded or decoded will be routed to failure") - .build(); + public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() + .name("Mode") + .description("Specifies whether the content should be encoded or decoded") + .required(true) + .allowableValues(ENCODE_MODE, DECODE_MODE) + .defaultValue(ENCODE_MODE) + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully encoded or decoded will be routed to success") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be encoded or decoded will be routed to failure") + .build(); - private List<PropertyDescriptor> properties; - private Set<Relationship> relationships; + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; - @Override - protected void init(final ProcessorInitializationContext context) { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(MODE); - this.properties = Collections.unmodifiableList(properties); + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(MODE); + this.properties = Collections.unmodifiableList(properties); - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - this.relationships = Collections.unmodifiableSet(relationships); - } + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } - @Override - public Set<Relationship> getRelationships() { - return relationships; - } + @Override + public Set<Relationship> getRelationships() { + return relationships; + } - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return properties; - } + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } - final ProcessorLog logger = getLogger(); + final ProcessorLog logger = getLogger(); - boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE); - try { - final StopWatch stopWatch = new StopWatch(true); - if (encode) { - flowFile = session.write(flowFile, new StreamCallback() { - @Override - public void process(InputStream in, OutputStream out) throws IOException { - try (Base64OutputStream bos = new Base64OutputStream(out)) { - int len = -1; - byte[] buf = new byte[8192]; - while ((len = in.read(buf)) > 0) { - bos.write(buf, 0, len); - } - bos.flush(); - } - } - }); - } else { - flowFile = session.write(flowFile, new StreamCallback() { - @Override - public void process(InputStream in, OutputStream out) throws IOException { - try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) { - int len = -1; - byte[] buf = new byte[8192]; - while ((len = bis.read(buf)) > 0) { - out.write(buf, 0, len); - } - out.flush(); - } - } - }); - } + boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE); + try { + final StopWatch stopWatch = new StopWatch(true); + if (encode) { + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + try (Base64OutputStream bos = new Base64OutputStream(out)) { + int len = -1; + byte[] buf = new byte[8192]; + while ((len = in.read(buf)) > 0) { + bos.write(buf, 0, len); + } + bos.flush(); + } + } + }); + } else { + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) { + int len = -1; + byte[] buf = new byte[8192]; + while ((len = bis.read(buf)) > 0) { + out.write(buf, 0, len); + } + out.flush(); + } + } + }); + } - logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile}); - session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(flowFile, REL_SUCCESS); - } catch (ProcessException e) { - logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - } - } + logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile}); + session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + } catch (ProcessException e) { + logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java index 1b9b20c..593cf44 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java @@ -29,20 +29,18 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import lzma.sdk.lzma.Decoder; -import lzma.streams.LzmaInputStream; -import lzma.streams.LzmaOutputStream; - import org.apache.commons.compress.compressors.CompressorStreamFactory; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -63,9 +61,14 @@ import org.tukaani.xz.LZMA2Options; import org.tukaani.xz.XZInputStream; import org.tukaani.xz.XZOutputStream; +import lzma.sdk.lzma.Decoder; +import lzma.streams.LzmaInputStream; +import lzma.streams.LzmaOutputStream; + @EventDriven @SideEffectFree @SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2"}) @CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type " + "attribute as appropriate") http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java index 2efc852..a45c211 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java @@ -31,6 +31,12 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -43,10 +49,6 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.timebuffer.EntityAccess; @@ -54,344 +56,345 @@ import org.apache.nifi.util.timebuffer.TimedBuffer; @SideEffectFree @TriggerSerially +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"rate control", "throttle", "rate", "throughput"}) @CapabilityDescription("Controls the rate at which data is transferred to follow-on processors.") public class ControlRate extends AbstractProcessor { - public static final String DATA_RATE = "data rate"; - public static final String FLOWFILE_RATE = "flowfile count"; - public static final String ATTRIBUTE_RATE = "attribute value"; - - public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder() - .name("Rate Control Criteria") - .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.") - .required(true) - .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE) - .defaultValue(DATA_RATE) - .build(); - public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder() - .name("Maximum Rate") - .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a " - + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria - .build(); - public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() - .name("Rate Controlled Attribute") - .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. " - + "The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. " - + "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); - public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder() - .name("Time Duration") - .description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.") - .required(true) - .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) - .defaultValue("1 min") - .build(); - public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() - .name("Grouping Attribute") - .description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for " - + "each value specified by the attribute with this name. Changing this value resets the rate counters.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All FlowFiles are transferred to this relationship") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format") - .build(); - - private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*"); - private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###"; - - private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>(); - private List<PropertyDescriptor> properties; - private Set<Relationship> relationships; - private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis()); - - @Override - protected void init(final ProcessorInitializationContext context) { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(RATE_CONTROL_CRITERIA); - properties.add(MAX_RATE); - properties.add(RATE_CONTROL_ATTRIBUTE_NAME); - properties.add(TIME_PERIOD); - properties.add(GROUPING_ATTRIBUTE_NAME); - this.properties = Collections.unmodifiableList(properties); - - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - this.relationships = Collections.unmodifiableSet(relationships); - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return properties; - } - - @Override - public Set<Relationship> getRelationships() { - return relationships; - } - - @Override - protected Collection<ValidationResult> customValidate(final ValidationContext context) { - final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context)); - - final Validator rateValidator; - switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) { - case DATA_RATE: - rateValidator = StandardValidators.DATA_SIZE_VALIDATOR; - break; - case ATTRIBUTE_RATE: - rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR; - final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); - if (rateAttr == null) { - validationResults.add(new ValidationResult.Builder() - .subject(RATE_CONTROL_ATTRIBUTE_NAME.getName()) - .explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'") - .build()); - } - break; - case FLOWFILE_RATE: - default: - rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR; - break; - } - - final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context); - if (!rateResult.isValid()) { - validationResults.add(rateResult); - } - - return validationResults; - } - - @Override - public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - super.onPropertyModified(descriptor, oldValue, newValue); - - if (descriptor.equals(RATE_CONTROL_CRITERIA) - || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME) - || descriptor.equals(GROUPING_ATTRIBUTE_NAME) - || descriptor.equals(TIME_PERIOD)) { - // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map. - throttleMap.clear(); - } else if (descriptor.equals(MAX_RATE)) { - final long newRate; - if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) { - newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue(); - } else { - newRate = Long.parseLong(newValue); - } - - for (final Throttle throttle : throttleMap.values()) { - throttle.setMaxRate(newRate); - } - } - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final long lastClearTime = lastThrottleClearTime.get(); - final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS); - if (lastClearTime < throttleExpirationMillis) { - if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) { - final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator(); - while (itr.hasNext()) { - final Map.Entry<String, Throttle> entry = itr.next(); - final Throttle throttle = entry.getValue(); - if (throttle.tryLock()) { - try { - if (throttle.lastUpdateTime() < lastClearTime) { - itr.remove(); - } - } finally { - throttle.unlock(); - } - } - } - } - } - - // TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final ProcessorLog logger = getLogger(); - final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS); - final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); - long rateValue; - switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) { - case DATA_RATE: - rateValue = flowFile.getSize(); - break; - case FLOWFILE_RATE: - rateValue = 1; - break; - case ATTRIBUTE_RATE: - final String attributeValue = flowFile.getAttribute(rateControlAttributeName); - if (attributeValue == null) { - logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[]{flowFile, rateControlAttributeName}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) { - logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive long", - new Object[]{flowFile, rateControlAttributeName, attributeValue}); - session.transfer(flowFile, REL_FAILURE); - return; - } - rateValue = Long.parseLong(attributeValue); - break; - default: - throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue()); - } - - final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue(); - final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName); - Throttle throttle = throttleMap.get(groupName); - if (throttle == null) { - throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger); - - final String maxRateValue = context.getProperty(MAX_RATE).getValue(); - final long newRate; - if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) { - newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue(); - } else { - newRate = Long.parseLong(maxRateValue); - } - throttle.setMaxRate(newRate); - - throttleMap.put(groupName, throttle); - } - - throttle.lock(); - try { - if (throttle.tryAdd(rateValue)) { - logger.info("transferring {} to 'success'", new Object[]{flowFile}); - session.transfer(flowFile, REL_SUCCESS); - } else { - flowFile = session.penalize(flowFile); - session.transfer(flowFile); - } - } finally { - throttle.unlock(); - } - } - - private static class TimestampedLong { - - private final Long value; - private final long timestamp = System.currentTimeMillis(); - - public TimestampedLong(final Long value) { - this.value = value; - } - - public Long getValue() { - return value; - } - - public long getTimestamp() { - return timestamp; - } - } - - private static class RateEntityAccess implements EntityAccess<TimestampedLong> { - - @Override - public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) { - if (oldValue == null && toAdd == null) { - return new TimestampedLong(0L); - } else if (oldValue == null) { - return toAdd; - } else if (toAdd == null) { - return oldValue; - } - - return new TimestampedLong(oldValue.getValue() + toAdd.getValue()); - } - - @Override - public TimestampedLong createNew() { - return new TimestampedLong(0L); - } - - @Override - public long getTimestamp(TimestampedLong entity) { - return entity == null ? 0L : entity.getTimestamp(); - } - } - - private static class Throttle extends ReentrantLock { - - private final AtomicLong maxRate = new AtomicLong(1L); - private final long timePeriodValue; - private final TimeUnit timePeriodUnit; - private final TimedBuffer<TimestampedLong> timedBuffer; - private final ProcessorLog logger; - - private volatile long penalizationExpired; - private volatile long lastUpdateTime; - - public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) { - this.timePeriodUnit = unit; - this.timePeriodValue = timePeriod; - this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess()); - this.logger = logger; - } - - public void setMaxRate(final long maxRate) { - this.maxRate.set(maxRate); - } - - public long lastUpdateTime() { - return lastUpdateTime; - } - - public boolean tryAdd(final long value) { - final long now = System.currentTimeMillis(); - if (penalizationExpired > now) { - return false; - } - - final long maxRateValue = maxRate.get(); - - final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit)); - if (sum != null && sum.getValue() >= maxRateValue) { - logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[]{sum.getValue(), value}); - return false; - } - - logger.debug("current sum for throttle is {}, so allowing rate of {} through", - new Object[]{sum == null ? 0 : sum.getValue(), value}); - - final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue(); - if (transferred > maxRateValue) { - final long amountOver = transferred - maxRateValue; - // determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long - final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit); - final double pct = (double) amountOver / (double) maxRateValue; - final long penalizationPeriod = (long) (milliDuration * pct); - this.penalizationExpired = now + penalizationPeriod; - logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod}); - } - - lastUpdateTime = now; - return true; - } - } + public static final String DATA_RATE = "data rate"; + public static final String FLOWFILE_RATE = "flowfile count"; + public static final String ATTRIBUTE_RATE = "attribute value"; + + public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder() + .name("Rate Control Criteria") + .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.") + .required(true) + .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE) + .defaultValue(DATA_RATE) + .build(); + public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder() + .name("Maximum Rate") + .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a " + + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria + .build(); + public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() + .name("Rate Controlled Attribute") + .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. " + + "The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. " + + "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder() + .name("Time Duration") + .description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.") + .required(true) + .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) + .defaultValue("1 min") + .build(); + public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() + .name("Grouping Attribute") + .description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for " + + "each value specified by the attribute with this name. Changing this value resets the rate counters.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles are transferred to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format") + .build(); + + private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*"); + private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###"; + + private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>(); + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis()); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RATE_CONTROL_CRITERIA); + properties.add(MAX_RATE); + properties.add(RATE_CONTROL_ATTRIBUTE_NAME); + properties.add(TIME_PERIOD); + properties.add(GROUPING_ATTRIBUTE_NAME); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext context) { + final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context)); + + final Validator rateValidator; + switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) { + case DATA_RATE: + rateValidator = StandardValidators.DATA_SIZE_VALIDATOR; + break; + case ATTRIBUTE_RATE: + rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR; + final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); + if (rateAttr == null) { + validationResults.add(new ValidationResult.Builder() + .subject(RATE_CONTROL_ATTRIBUTE_NAME.getName()) + .explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'") + .build()); + } + break; + case FLOWFILE_RATE: + default: + rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR; + break; + } + + final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context); + if (!rateResult.isValid()) { + validationResults.add(rateResult); + } + + return validationResults; + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + super.onPropertyModified(descriptor, oldValue, newValue); + + if (descriptor.equals(RATE_CONTROL_CRITERIA) + || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME) + || descriptor.equals(GROUPING_ATTRIBUTE_NAME) + || descriptor.equals(TIME_PERIOD)) { + // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map. + throttleMap.clear(); + } else if (descriptor.equals(MAX_RATE)) { + final long newRate; + if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) { + newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue(); + } else { + newRate = Long.parseLong(newValue); + } + + for (final Throttle throttle : throttleMap.values()) { + throttle.setMaxRate(newRate); + } + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final long lastClearTime = lastThrottleClearTime.get(); + final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS); + if (lastClearTime < throttleExpirationMillis) { + if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) { + final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator(); + while (itr.hasNext()) { + final Map.Entry<String, Throttle> entry = itr.next(); + final Throttle throttle = entry.getValue(); + if (throttle.tryLock()) { + try { + if (throttle.lastUpdateTime() < lastClearTime) { + itr.remove(); + } + } finally { + throttle.unlock(); + } + } + } + } + } + + // TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS); + final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); + long rateValue; + switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) { + case DATA_RATE: + rateValue = flowFile.getSize(); + break; + case FLOWFILE_RATE: + rateValue = 1; + break; + case ATTRIBUTE_RATE: + final String attributeValue = flowFile.getAttribute(rateControlAttributeName); + if (attributeValue == null) { + logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[]{flowFile, rateControlAttributeName}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) { + logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive long", + new Object[]{flowFile, rateControlAttributeName, attributeValue}); + session.transfer(flowFile, REL_FAILURE); + return; + } + rateValue = Long.parseLong(attributeValue); + break; + default: + throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue()); + } + + final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue(); + final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName); + Throttle throttle = throttleMap.get(groupName); + if (throttle == null) { + throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger); + + final String maxRateValue = context.getProperty(MAX_RATE).getValue(); + final long newRate; + if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) { + newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue(); + } else { + newRate = Long.parseLong(maxRateValue); + } + throttle.setMaxRate(newRate); + + throttleMap.put(groupName, throttle); + } + + throttle.lock(); + try { + if (throttle.tryAdd(rateValue)) { + logger.info("transferring {} to 'success'", new Object[]{flowFile}); + session.transfer(flowFile, REL_SUCCESS); + } else { + flowFile = session.penalize(flowFile); + session.transfer(flowFile); + } + } finally { + throttle.unlock(); + } + } + + private static class TimestampedLong { + + private final Long value; + private final long timestamp = System.currentTimeMillis(); + + public TimestampedLong(final Long value) { + this.value = value; + } + + public Long getValue() { + return value; + } + + public long getTimestamp() { + return timestamp; + } + } + + private static class RateEntityAccess implements EntityAccess<TimestampedLong> { + + @Override + public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) { + if (oldValue == null && toAdd == null) { + return new TimestampedLong(0L); + } else if (oldValue == null) { + return toAdd; + } else if (toAdd == null) { + return oldValue; + } + + return new TimestampedLong(oldValue.getValue() + toAdd.getValue()); + } + + @Override + public TimestampedLong createNew() { + return new TimestampedLong(0L); + } + + @Override + public long getTimestamp(TimestampedLong entity) { + return entity == null ? 0L : entity.getTimestamp(); + } + } + + private static class Throttle extends ReentrantLock { + + private final AtomicLong maxRate = new AtomicLong(1L); + private final long timePeriodValue; + private final TimeUnit timePeriodUnit; + private final TimedBuffer<TimestampedLong> timedBuffer; + private final ProcessorLog logger; + + private volatile long penalizationExpired; + private volatile long lastUpdateTime; + + public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) { + this.timePeriodUnit = unit; + this.timePeriodValue = timePeriod; + this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess()); + this.logger = logger; + } + + public void setMaxRate(final long maxRate) { + this.maxRate.set(maxRate); + } + + public long lastUpdateTime() { + return lastUpdateTime; + } + + public boolean tryAdd(final long value) { + final long now = System.currentTimeMillis(); + if (penalizationExpired > now) { + return false; + } + + final long maxRateValue = maxRate.get(); + + final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit)); + if (sum != null && sum.getValue() >= maxRateValue) { + logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[]{sum.getValue(), value}); + return false; + } + + logger.debug("current sum for throttle is {}, so allowing rate of {} through", + new Object[]{sum == null ? 0 : sum.getValue(), value}); + + final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue(); + if (transferred > maxRateValue) { + final long amountOver = transferred - maxRateValue; + // determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long + final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit); + final double pct = (double) amountOver / (double) maxRateValue; + final long penalizationPeriod = (long) (milliDuration * pct); + this.penalizationExpired = now + penalizationPeriod; + logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod}); + } + + lastUpdateTime = now; + return true; + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java index a0a1364..7a99a59 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java @@ -33,8 +33,10 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; @@ -76,6 +78,7 @@ import java.util.concurrent.TimeUnit; */ @EventDriven @SideEffectFree +@InputRequirement(Requirement.INPUT_REQUIRED) @SupportsBatching @Tags({"text", "convert", "characterset", "character set"}) @CapabilityDescription("Converts a FlowFile's content from one character set to another") http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java index 7eda593..9591960 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -34,10 +34,12 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -64,6 +66,7 @@ import org.codehaus.jackson.node.JsonNodeFactory; @SideEffectFree @SupportsBatching @SeeAlso(PutSQL.class) +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"}) @CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be " + "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to "