NIFI-810: Created RequiresInput annotation and ensure that processors are invalid if connections do not agree
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/034ee6de Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/034ee6de Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/034ee6de Branch: refs/heads/NIFI-810-InputRequirement Commit: 034ee6de6bc4c6923a835fbeaab4fb05fd694434 Parents: 96764ed Author: Mark Payne <marka...@hotmail.com> Authored: Fri Sep 25 11:39:28 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Fri Sep 25 11:39:58 2015 -0400 ---------------------------------------------------------------------- .../annotation/behavior/InputRequirement.java | 51 + .../src/main/asciidoc/developer-guide.adoc | 11 + .../nifi/processors/avro/ConvertAvroToJSON.java | 3 + .../processors/avro/ExtractAvroMetadata.java | 29 +- .../apache/nifi/processors/avro/SplitAvro.java | 27 +- .../nifi/processors/aws/s3/FetchS3Object.java | 3 + .../nifi/processors/aws/s3/PutS3Object.java | 6 +- .../apache/nifi/processors/aws/sns/PutSNS.java | 3 + .../nifi/processors/aws/sqs/DeleteSQS.java | 3 + .../apache/nifi/processors/aws/sqs/GetSQS.java | 5 +- .../apache/nifi/processors/aws/sqs/PutSQS.java | 5 +- .../nifi/processors/flume/ExecuteFlumeSink.java | 14 +- .../processors/flume/ExecuteFlumeSource.java | 14 +- .../apache/nifi/controller/ProcessorNode.java | 89 +- .../nifi/controller/StandardProcessorNode.java | 2440 +++++++++--------- .../org/apache/nifi/processors/GeoEnrichIP.java | 3 + .../hadoop/CreateHadoopSequenceFile.java | 4 +- .../nifi/processors/hadoop/FetchHDFS.java | 3 + .../apache/nifi/processors/hadoop/GetHDFS.java | 3 + .../apache/nifi/processors/hadoop/ListHDFS.java | 3 + .../apache/nifi/processors/hadoop/PutHDFS.java | 3 + .../processors/hl7/ExtractHL7Attributes.java | 3 + .../apache/nifi/processors/hl7/RouteHL7.java | 3 + .../processors/image/ExtractImageMetadata.java | 36 +- .../nifi/processors/image/ResizeImage.java | 38 +- .../apache/nifi/processors/kafka/GetKafka.java | 21 +- .../apache/nifi/processors/kafka/PutKafka.java | 10 +- .../nifi/processors/kite/ConvertCSVToAvro.java | 16 +- .../nifi/processors/kite/ConvertJSONToAvro.java | 14 +- .../processors/kite/StoreInKiteDataset.java | 9 +- .../nifi/processors/yandex/YandexTranslate.java | 3 + .../nifi-pcap-processors/.gitignore | 1 + .../nifi/processors/twitter/GetTwitter.java | 5 +- .../apache/nifi/processors/solr/GetSolr.java | 43 +- .../processors/solr/PutSolrContentStream.java | 33 +- .../standard/Base64EncodeContent.java | 171 +- .../processors/standard/CompressContent.java | 15 +- .../nifi/processors/standard/ControlRate.java | 683 ++--- .../standard/ConvertCharacterSet.java | 3 + .../processors/standard/ConvertJSONToSQL.java | 3 + .../processors/standard/DetectDuplicate.java | 3 + .../processors/standard/DistributeLoad.java | 3 + .../processors/standard/DuplicateFlowFile.java | 3 + .../nifi/processors/standard/EncodeContent.java | 15 +- .../processors/standard/EncryptContent.java | 3 + .../processors/standard/EvaluateJsonPath.java | 38 +- .../nifi/processors/standard/EvaluateXPath.java | 29 +- .../processors/standard/EvaluateXQuery.java | 25 +- .../processors/standard/ExecuteProcess.java | 3 + .../nifi/processors/standard/ExecuteSQL.java | 3 + .../standard/ExecuteStreamCommand.java | 7 +- .../nifi/processors/standard/ExtractText.java | 3 + .../processors/standard/GenerateFlowFile.java | 11 +- .../apache/nifi/processors/standard/GetFTP.java | 13 +- .../nifi/processors/standard/GetFile.java | 7 +- .../nifi/processors/standard/GetHTTP.java | 3 + .../nifi/processors/standard/GetJMSQueue.java | 3 + .../nifi/processors/standard/GetJMSTopic.java | 3 + .../nifi/processors/standard/GetSFTP.java | 7 +- .../processors/standard/HandleHttpRequest.java | 7 +- .../processors/standard/HandleHttpResponse.java | 5 +- .../nifi/processors/standard/HashAttribute.java | 5 +- .../nifi/processors/standard/HashContent.java | 5 +- .../processors/standard/IdentifyMimeType.java | 5 +- .../nifi/processors/standard/InvokeHTTP.java | 3 + .../nifi/processors/standard/ListenHTTP.java | 16 +- .../nifi/processors/standard/ListenUDP.java | 18 +- .../nifi/processors/standard/LogAttribute.java | 16 +- .../nifi/processors/standard/MergeContent.java | 11 +- .../nifi/processors/standard/ModifyBytes.java | 14 +- .../processors/standard/MonitorActivity.java | 31 +- .../nifi/processors/standard/PostHTTP.java | 3 + .../nifi/processors/standard/PutEmail.java | 3 + .../apache/nifi/processors/standard/PutFTP.java | 3 + .../nifi/processors/standard/PutFile.java | 3 + .../apache/nifi/processors/standard/PutJMS.java | 5 +- .../nifi/processors/standard/PutSFTP.java | 3 + .../apache/nifi/processors/standard/PutSQL.java | 3 + .../nifi/processors/standard/ReplaceText.java | 54 +- .../standard/ReplaceTextWithMapping.java | 18 +- .../processors/standard/RouteOnAttribute.java | 3 + .../processors/standard/RouteOnContent.java | 19 +- .../nifi/processors/standard/ScanAttribute.java | 19 +- .../nifi/processors/standard/ScanContent.java | 5 +- .../processors/standard/SegmentContent.java | 7 +- .../nifi/processors/standard/SplitContent.java | 7 +- .../nifi/processors/standard/SplitJson.java | 32 +- .../nifi/processors/standard/SplitText.java | 53 +- .../nifi/processors/standard/SplitXml.java | 18 +- .../nifi/processors/standard/TransformXml.java | 3 + .../nifi/processors/standard/UnpackContent.java | 9 +- .../nifi/processors/standard/ValidateXml.java | 16 +- .../processors/attributes/UpdateAttribute.java | 9 +- 93 files changed, 2418 insertions(+), 2027 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java new file mode 100644 index 0000000..97e6b88 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java @@ -0,0 +1,51 @@ +package org.apache.nifi.annotation.behavior; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * <p> + * Marker interface that a Processor can use to indicate whether it accepts, requires, or forbids + * input from other Processors. This information is used by the framework in order to ensure that + * a Processor is marked as invalid if it is missing necessary input or has input that will be ignored. + * This information also is used by the NiFi UI in order to prevent users from making connections + * to Processors that don't make sense. + * </p> + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface InputRequirement { + Requirement value(); + + public static enum Requirement { + /** + * This value is used to indicate that the Processor requires input from other Processors + * in order to run. As a result, the Processor will not be valid if it does not have any + * incoming connections. + */ + INPUT_REQUIRED, + + /** + * This value is used to indicate that the Processor will consume data from an incoming + * connection but does not require an incoming connection in order to perform its task. + * If the {@link InputRequirement} annotation is not present, this is the default value + * that is used. + */ + INPUT_ALLOWED, + + /** + * This value is used to indicate that the Processor is a "Source Processor" and does + * not accept incoming connections. Because the Processor does not pull FlowFiles from + * an incoming connection, it can be very confusing for users who create incoming connections + * to the Processor. As a result, this value can be used in order to clarify that incoming + * connections will not be used. This prevents the user from even creating such a connection. + */ + INPUT_FORBIDDEN; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-docs/src/main/asciidoc/developer-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc b/nifi-docs/src/main/asciidoc/developer-guide.adoc index f9950d5..28df5c2 100644 --- a/nifi-docs/src/main/asciidoc/developer-guide.adoc +++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc @@ -1633,6 +1633,17 @@ will handle your Processor: not there is any data on an input queue. This is useful, for example, if the Processor needs to be triggered to run periodically to time out a network connection. + - `InputRequirement`: By default, all Processors will allow users to create incoming connections for the Processor, but + if the user does not create an incoming connection, the Processor is still valid and can be scheduled to run. For Processors + that are expected to be used as a "Source Processor," though, this can be confusing to the user, and the user may attempt to + send FlowFiles to that Processor, only for the FlowFiles to queue up without being processed. Conversely, if the Processor + expects incoming FlowFiles but does not have an input queue, the Processor will be scheduled to run but will perform no work, + as it will receive no FlowFile, and this leads to confusion as well. As a result, we can use the `@InputRequirement` annotation + and provide it a value of `INPUT_REQUIRED`, `INPUT_ALLOWED`, or `INPUT_FORBIDDEN`. This provides information to the framework + about when the Processor should be made invalid, or whether or not the user should even be able to draw a Connection to the + Processor. For instance, if a Processor is annotated with `InputRequirement(Requirement.INPUT_FORBIDDEN)`, then the user will + not even be able to create a Connection with that Processor as the destination. + === Data Buffering http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java index 8832a73..b214427 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java @@ -29,6 +29,8 @@ import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -46,6 +48,7 @@ import org.apache.nifi.processor.io.StreamCallback; @SideEffectFree @SupportsBatching @Tags({ "json", "avro", "binary" }) +@InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such " + "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this " + "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of " http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java index 48aad7d..4cf5289 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java @@ -16,6 +16,19 @@ */ package org.apache.nifi.processors.avro; +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + import org.apache.avro.Schema; import org.apache.avro.SchemaNormalization; import org.apache.avro.file.DataFileStream; @@ -23,6 +36,8 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.commons.codec.binary.Hex; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -41,22 +56,10 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; - @SideEffectFree @SupportsBatching @Tags({ "avro", "schema", "metadata" }) +@InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Extracts metadata from the header of an Avro datafile.") @WritesAttributes({ @WritesAttribute(attribute = "schema.type", description = "The type of the schema (i.e. record, enum, etc.)."), http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java index 3b344b5..dbf5778 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java @@ -16,6 +16,18 @@ */ package org.apache.nifi.processors.avro; +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileConstants; import org.apache.avro.file.DataFileStream; @@ -26,6 +38,8 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -45,21 +59,10 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.util.ObjectHolder; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - @SideEffectFree @SupportsBatching @Tags({ "avro", "split" }) +@InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Splits a binary encoded Avro datafile into smaller files based on the configured Output Size. The Output Strategy determines if " + "the smaller files will be Avro datafiles, or bare Avro records with metadata in the FlowFile attributes. The output will always be binary encoded.") public class SplitAvro extends AbstractProcessor { http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 2406b67..131e671 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; @@ -45,6 +47,7 @@ import com.amazonaws.services.s3.model.S3Object; @SupportsBatching @SeeAlso({PutS3Object.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Amazon", "S3", "AWS", "Get", "Fetch"}) @CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile") @WritesAttributes({ http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 24c82dd..7398c4e 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -54,6 +56,7 @@ import com.amazonaws.services.s3.model.StorageClass; @SupportsBatching @SeeAlso({FetchS3Object.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Amazon", "S3", "AWS", "Archive", "Put"}) @CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket") @DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", @@ -101,7 +104,8 @@ public class PutS3Object extends AbstractS3Processor { .build(); } - public void onTrigger(final ProcessContext context, final ProcessSession session) { + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { FlowFile flowFile = session.get(); if (flowFile == null) { return; http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java index 7d42703..e571ff4 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java @@ -23,6 +23,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; @@ -41,6 +43,7 @@ import com.amazonaws.services.sns.model.PublishRequest; @SupportsBatching @SeeAlso({GetSQS.class, PutSQS.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"amazon", "aws", "sns", "topic", "put", "publish", "pubsub"}) @CapabilityDescription("Sends the content of a FlowFile as a notification to the Amazon Simple Notification Service") public class PutSNS extends AbstractSNSProcessor { http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java index 65e020d..f88aa71 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java @@ -21,6 +21,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; @@ -37,6 +39,7 @@ import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; @SupportsBatching @SeeAlso({GetSQS.class, PutSQS.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"}) @CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue") public class DeleteSQS extends AbstractSQSProcessor { http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java index 7c2dd2d..a140999 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; @@ -51,8 +53,9 @@ import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; @SupportsBatching +@SeeAlso({ PutSQS.class, DeleteSQS.class }) +@InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"}) -@SeeAlso({PutSQS.class, DeleteSQS.class}) @CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue") @WritesAttributes({ @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the message"), http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java index 3961f32..0af508e 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java @@ -28,6 +28,8 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; @@ -45,8 +47,9 @@ import com.amazonaws.services.sqs.model.SendMessageBatchRequest; import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; @SupportsBatching +@SeeAlso({ GetSQS.class, DeleteSQS.class }) +@InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Amazon", "AWS", "SQS", "Queue", "Put", "Publish"}) -@SeeAlso({GetSQS.class, DeleteSQS.class}) @CapabilityDescription("Publishes a message to an Amazon Simple Queuing Service Queue") @DynamicProperty(name = "The name of a Message Attribute to add to the message", value = "The value of the Message Attribute", description = "Allows the user to add key/value pairs as Message Attributes by adding a property whose name will become the name of " http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java index 57e0278..f93b215 100644 --- a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java +++ b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java @@ -16,20 +16,19 @@ */ package org.apache.nifi.processors.flume; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import java.util.List; import java.util.Set; + import org.apache.flume.EventDeliveryException; import org.apache.flume.Sink; import org.apache.flume.conf.Configurables; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; - import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; import org.apache.nifi.processor.ProcessContext; @@ -40,12 +39,17 @@ import org.apache.nifi.processor.SchedulingContext; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + /** * This processor runs a Flume sink */ +@TriggerSerially @Tags({"flume", "hadoop", "put", "sink"}) +@InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Execute a Flume sink. Each input FlowFile is converted into a Flume Event for processing by the sink.") -@TriggerSerially public class ExecuteFlumeSink extends AbstractFlumeProcessor { public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java index 600f4b1..3aad6b7 100644 --- a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java +++ b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java @@ -16,12 +16,10 @@ */ package org.apache.nifi.processors.flume; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; + import org.apache.flume.EventDeliveryException; import org.apache.flume.EventDrivenSource; import org.apache.flume.PollableSource; @@ -29,12 +27,13 @@ import org.apache.flume.Source; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurables; import org.apache.flume.source.EventDrivenSourceRunner; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; - import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; import org.apache.nifi.processor.ProcessContext; @@ -46,12 +45,17 @@ import org.apache.nifi.processor.SchedulingContext; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + /** * This processor runs a Flume source */ +@TriggerSerially @Tags({"flume", "hadoop", "get", "source"}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Execute a Flume source. Each Flume Event is sent to the success relationship as a FlowFile") -@TriggerSerially public class ExecuteFlumeSource extends AbstractFlumeProcessor { public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/034ee6de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index f2a83d0..2f72d0f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; @@ -30,70 +31,72 @@ import org.apache.nifi.scheduling.SchedulingStrategy; public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable { - public ProcessorNode(final Processor processor, final String id, - final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { - super(processor, id, validationContextFactory, serviceProvider); - } + public ProcessorNode(final Processor processor, final String id, + final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { + super(processor, id, validationContextFactory, serviceProvider); + } - public abstract boolean isIsolated(); + public abstract boolean isIsolated(); - public abstract boolean isTriggerWhenAnyDestinationAvailable(); + public abstract boolean isTriggerWhenAnyDestinationAvailable(); - @Override - public abstract boolean isSideEffectFree(); + @Override + public abstract boolean isSideEffectFree(); - public abstract boolean isTriggeredSerially(); + public abstract boolean isTriggeredSerially(); - public abstract boolean isEventDrivenSupported(); + public abstract boolean isEventDrivenSupported(); - public abstract boolean isHighThroughputSupported(); + public abstract boolean isHighThroughputSupported(); - @Override - public abstract boolean isValid(); + public abstract Requirement getInputRequirement(); - public abstract void setScheduledState(ScheduledState scheduledState); + @Override + public abstract boolean isValid(); - public abstract void setBulletinLevel(LogLevel bulletinLevel); + public abstract void setScheduledState(ScheduledState scheduledState); - public abstract LogLevel getBulletinLevel(); + public abstract void setBulletinLevel(LogLevel bulletinLevel); - public abstract Processor getProcessor(); + public abstract LogLevel getBulletinLevel(); - public abstract void yield(long period, TimeUnit timeUnit); + public abstract Processor getProcessor(); - public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships); + public abstract void yield(long period, TimeUnit timeUnit); - public abstract Set<Relationship> getAutoTerminatedRelationships(); + public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships); - public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy); + public abstract Set<Relationship> getAutoTerminatedRelationships(); - @Override - public abstract SchedulingStrategy getSchedulingStrategy(); + public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy); - public abstract void setRunDuration(long duration, TimeUnit timeUnit); + @Override + public abstract SchedulingStrategy getSchedulingStrategy(); - public abstract long getRunDuration(TimeUnit timeUnit); + public abstract void setRunDuration(long duration, TimeUnit timeUnit); - public abstract Map<String, String> getStyle(); + public abstract long getRunDuration(TimeUnit timeUnit); - public abstract void setStyle(Map<String, String> style); + public abstract Map<String, String> getStyle(); - /** - * @return the number of threads (concurrent tasks) currently being used by - * this Processor - */ - public abstract int getActiveThreadCount(); + public abstract void setStyle(Map<String, String> style); - /** - * Verifies that this Processor can be started if the provided set of - * services are enabled. This is introduced because we need to verify that - * all components can be started before starting any of them. In order to do - * that, we need to know that this component can be started if the given - * services are enabled, as we will then enable the given services before - * starting this component. - * - * @param ignoredReferences to ignore - */ - public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences); + /** + * @return the number of threads (concurrent tasks) currently being used by + * this Processor + */ + public abstract int getActiveThreadCount(); + + /** + * Verifies that this Processor can be started if the provided set of + * services are enabled. This is introduced because we need to verify that + * all components can be started before starting any of them. In order to do + * that, we need to know that this component can be started if the given + * services are enabled, as we will then enable the given services before + * starting this component. + * + * @param ignoredReferences to ignore + */ + public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences); }