http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-docs/src/main/asciidoc/developer-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc b/nifi-docs/src/main/asciidoc/developer-guide.adoc new file mode 100644 index 0000000..bce4917 --- /dev/null +++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc @@ -0,0 +1,2170 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +NiFi Developer's Guide +====================== +Apache NiFi Team <d...@nifi.apache.org> +:homepage: http://nifi.apache.org + + +== Introduction + +The intent of this Developer Guide is to provide the reader with the information needed to understand how Apache NiFi +extensions are developed and help to explain the thought process behind developing the components. It provides an introduction to +and explanation of the API that is used to develop extensions. It does not, however, go into great detail about each +of the methods in the API, as this guide is intended to supplement the JavaDocs of the API rather than replace them. +This guide also assumes that the reader is familiar with Java 7 and Apache Maven. + +This guide is written by developers for developers. It is expected that before reading this +guide, you have a basic understanding of NiFi and the concepts of dataflow. If not, please see the link:overview.html[NiFi Overview] +and the link:user-guide.html[NiFi User Guide] to familiarize yourself with the concepts of NiFi. + + +[[components]] +== NiFi Components + +NiFi provides several extension points to provide developers the +ability to add functionality to the application to meet their needs. The following list provides a +high-level description of the most common extension points: + +- Processor + * The Processor interface is the mechanism through which NiFi exposes access to + <<flowfile>>s, their attributes, and their content. The Processor is the basic building + block used to comprise a NiFi dataflow. This interface is used to accomplish + all of the following tasks: + + ** Create FlowFiles + ** Read FlowFile content + ** Write FlowFile content + ** Read FlowFile attributes + ** Update FlowFile attributes + ** Ingest data + ** Egress data + ** Route data + ** Extract data + ** Modify data + +- ReportingTask + * The ReportingTask interface is a mechanism that NiFi exposes to allow metrics, + monitoring information, and internal NiFi state to be published to external + endpoints, such as log files, e-mail, and remote web services. + +- ControllerService + * A ControllerService provides shared state and functionality across Processors, other ControllerServices, + and ReportingTasks within a single JVM. An example use case may include loading a very + large dataset into memory. By performing this work in a ControllerService, the data + can be loaded once and be exposed to all Processors via this service, rather than requiring + many different Processors to load the dataset themselves. + +- FlowFilePrioritizer + * The FlowFilePrioritizer interface provides a mechanism by which <<flowfile>>s + in a queue can be prioritized, or sorted, so that the FlowFiles can be processed in an order + that is most effective for a particular use case. + +- AuthorityProvider + * An AuthorityProvide is responsible for determining which privileges and roles, if any, + a given user should be granted. + + +[[processor_api]] +== Processor API + +The Processor is the most widely used Component available in NiFi. +Processors are the only Component +to which access is given to create, remove, modify, or inspect +FlowFiles (data and attributes). + +All Processors are loaded and instantiated using Java's ServiceLoader +mechanism. This means that all +Processors must adhere to the following rules: + + - The Processor must have a default constructor. + - The Processor's JAR file must contain an entry in the META-INF/services directory named + `org.apache.nifi.processor.Processor`. This is a text file where each line contains the + fully-qualified class name of a Processor. + +While `Processor` is an interface that can be implemented directly, it +will be extremely rare to do so, as +the `org.apache.nifi.processor.AbstractProcessor` is the base class +for almost all Processor implementations. The `AbstractProcessor` class provides a significant +amount of functionality, which makes the task of developing a Processor much easier and more convenient. +For the scope of this document, we will focus primarily on the `AbstractProcessor` class when dealing +with the Processor API. + +.Concurrency Note +NiFi is a highly concurrent framework. This means that all extensions +must be thread-safe. If unfamiliar with writing concurrent software in Java, it is highly +recommended that you familiarize yourself with the principles of Java concurrency. + + +[[supporting_api]] +=== Supporting API + +In order to understand the Processor API, we must first understand - +at least at a high level - several supporting classes and interfaces, which are discussed below. + +[[flowfile]] +==== FlowFile +A FlowFile is a logical notion that correlates a piece of data with a +set of Attributes about that data. +Such attributes include a FlowFile's unique identifier, as well as its +name, size, and any number of other +flow-specific values. While the contents and attributes of a FlowFile +can change, the FlowFile object is +immutable. Modifications to a FlowFile are made possible by the ProcessSession. + +[[process_session]] +==== ProcessSession +The ProcessSession, often referred to as simply a "session," provides +a mechanism by which FlowFiles can be created, destroyed, examined, cloned, and transferred to other +Processors. Additionally, a ProcessSession provides mechanism for creating modified versions of +FlowFiles, by adding or removing attributes, or by modifying the FlowFile's content. The ProcessSession +also exposes a mechanism for emitting provenance events that provide for the ability to track the +lineage and history of a FlowFile. After operations are performed on one or more FlowFiles, a +ProcessSession can be either committed or rolled back. + +[[process_context]] +==== ProcessContext +The ProcessContext provides a bridge between a Processor and the framework. It provides information +about how the Processor is currently configured and allows the Processor to perform +Framework-specific tasks, such as yielding its resources so that the framework will schedule other +Processors to run without consuming resources unnecessarily. + + +[[property_descriptor]] +==== PropertyDescriptor +PropertyDescriptor defines a property that is to be used by a +Processor, ReportingTask, or ControllerService. +The definition of a property includes its name, a description of the +property, an optional default value, +validation logic, and an indicator as to whether or not the property +is required in order for the Processor +to be valid. PropertyDescriptors are created by instantiating an +instance of the `PropertyDescriptor.Builder` +class, calling the appropriate methods to fill in the details about +the property, and finally calling +the `build` method. + + +[[validator]] +==== Validator +A PropertyDescriptor may specify one or more Validators that can be +used to ensure that the user-entered value +for a property is valid. If a Validator indicates that a property +value is invalid, the Component will not be +able to be run or used until the property becomes valid. + + +[[validation_context]] +==== ValidationContext +When validating property values, a ValidationContext can be used to +obtain ControllerServices, +create PropertyValue objects, and compile and evaluate property values +using the Expression Language. + + +[[property_value]] +==== PropertyValue +All property values returned to a Processor are returned in the form +of a PropertyValue object. This +object has convenience methods for converting the value from a String +to other forms, such as numbers +and time periods, as well as providing an API for evaluating the +Expression Language. + + +[[relationship]] +==== Relationship +Relationships define the routes to which a FlowFile may be transfered +from a Processor. Relationships +are created by instantiating an instance of the `Relationship.Builder` +class, calling the appropriate methods +to fill in the details of the Relationship, and finally calling the +`build` method. + + +[[processor_initialization_context]] +==== ProcessorInitializationContext +After a Processor is created, its `initialize` method will be called +with an `InitializationContext` object. +This object exposes configuration to the Processor that will not +change throughout the life of the Processor, +such as the unique identifier of the Processor. + +[[ProcessorLog]] +==== ProcessorLog +Processors are encouraged to perform their logging via the +`ProcessorLog` interface, rather than obtaining +a direct instance of a third-party logger. This is because logging via +the ProcessorLog allows the framework +to render log messages that exceed s a configurable severity level to +the User Interface, allowing those who +monitor the dataflow to be notified when important events occur. +Additionally, it provides a consistent logging +format for all Processors by logging stack traces when in DEBUG mode +and providing the Processor's unique +identifier in log messages. + + + + + +[[AbstractProcessor]] +=== AbstractProcessor API + +Since the vast majority of Processors will be created by extending the +AbstractProcessor, it is the +abstract class that we will examine in this section. The +AbstractProcessor provides several methods that +will be of interest to Processor developers. + + +==== Processor Initialization + +When a Processor is created, before any other methods are invoked, the +`init` method of the +AbstractProcessor will be invoked. The method takes a single argument, +which is of type +`ProcessorInitializationContext`. The context object supplies the +Processor with a ProcessorLog, +the Processor's unique identifier, and a ControllerServiceLookup that +can be used to interact with the +configured ControllerServices. Each of these objects is stored by the +AbstractProcessor and may be obtained by +subclasses via the `getLogger`, `getIdentifier`, and +`getControllerServiceLookup` methods, respectively. + + +==== Exposing Processor's Relationships + +In order for a Processor to transfer a FlowFile to a new destination +for follow-on processing, the +Processor must first be able to expose to the Framework all of the +Relationships that it currently supports. +This allows users of the application to connect Processors to one +another by creating +Connections between Processors and assigning the appropriate +Relationships to those Connections. + +A Processor exposes the valid set of Relationships by overriding the +`getRelationships` method. +This method takes no arguments and returns a `Set` of `Relationship` +objects. For most Processors, this Set +will be static, but other Processors will generate the Set +dynamically, based on user configuration. +For those Processors for which the Set is static, it is advisable to +create an immutable Set in the Processor's +constructor or init method and return that value, rather than +dynamically generating the Set. This +pattern lends itself to cleaner code and better performance. + + +==== Exposing Processor Properties + +Most Processors will require some amount of user configuration before +they are able to be used. The properties +that a Processor supports are exposed to the Framework via the +`getSupportedPropertyDescriptors` method. +This method takes no arguments and returns a `List` of +`PropertyDescriptor` objects. The order of the objects in the +List is important in that it dictates the order in which the +properties will be rendered in the User Interface. + +A `PropertyDescriptor` object is constructed by creating a new +instance of the `PropertyDescriptor.Builder` object, +calling the appropriate methods on the builder, and finally calling +the `build` method. + +While this method covers most of the use cases, it is sometimes +desirable to allow users to configure +additional properties whose name are not known. This can be achieved +by overriding the +`getSupportedDynamicPropertyDescriptor` method. This method takes a +`String` as its only argument, which +indicates the name of the property. The method returns a +`PropertyDescriptor` object that can be used to validate +both the name of the property, as well as the value. Any +PropertyDescriptor that is returned from this method +should be built setting the value of `isDynamic` to true in the +`PropertyDescriptor.Builder` class. The default +behavior of AbstractProcessor is to not allow any dynamically created +properties. + + +==== Validating Processor Properties + +A Processor is not able to be started if its configuration is not +valid. Validation of a Processor property can +be achieved by setting a Validator on a PropertyDescriptor or by +restricting the allowable values for a +property via the PropertyDescriptor.Builder's `allowableValues` method +or `identifiesControllerService` method. + +There are times, though, when validating a Processor's properties +individually is not sufficient. For this purpose, +the AbstractProcessor exposes a `customValidate` method. The method +takes a single argument of type `ValidationContext`. +The return value of this method is a `Collection` of +`ValidationResult` objects that describe any problems that were +found during validation. Only those ValidationResult objects whose +`isValid` method returns `false` should be returned. +This method will be invoked only if all properties are valid according +to their associated Validators and Allowable Values. +I.e., this method will be called only if all properties are valid +in-and-of themselves, and this method allows for +validation of a Processor's configuration as a whole. + + +==== Responding to Changes in Configuration + +It is sometimes desirable to have a Processor eagerly react when its +properties are changed. The `onPropertyModified` +method allows a Processor to do just that. When a user changes the +property values for a Processor, the +`onPropertyModified` method will be called for each modified property. +The method takes three arguments: the PropertyDescriptor that +indicates which property was modified, +the old value, and the new value. If the property had no previous +value, the second argument will be `null`. If the property +was removed, the third argument will be `null`. It is important to +note that this method will be called regardless of whether +or not the values are valid. This method will be called only when a +value is actually modified, rather than being +called when a user updates a Processor without changing its value. At +the point that this method is invoked, it is guaranteed +that the thread invoking this method is the only thread currently +executing code in the Processor, unless the Processor itself +creates its own threads. + + +==== Performing the Work + +When a Processor has work to do, it is scheduled to do so by having +its `onTrigger` method called by the framework. +The method takes two arguments: a `ProcessContext` and a +`ProcessSession`. The first step in the `onTrigger` method +is often to obtain a FlowFile on which the work is to be performed by +calling one of the `get` methods on the ProcessSession. +For Processors that ingest data into NiFi from external sources, this +step is skipped. The Processor is then free to examine +FlowFile attributes; add, remove, or modify attributes; read or modify +FlowFile content; and transfer FlowFiles to the appropriate +Relationships. + + +==== When Processors are Triggered + +A Processor's `onTrigger` method will be called only when it is +scheduled to run and when work exists for the Processor. +Work is said to exist for a Processor if any of the following conditions is met: + +- A Connection whose destination is the Processor has at least one +FlowFile in its queue +- The Processors has no incoming Connections +- The Processor is annotated with the @TriggerWhenEmpty annotation + +Several factors exist that will contribute to when a Processor's +`onTrigger` method is invoked. First, the Processor will not +be triggered unless a user has configured the Processor to run. If a +Processor is scheduled to run, the Framework periodically +(the period is configured by users in the User Interface) checks if +there is work for the Processor to do, as described above. +If so, the Framework will check downstream destinations of the +Processor. If any of the Processor's outbound Connections is full, +by default, the Processor will not be scheduled to run. + +However, the `@TriggerWhenAnyDestinationAvailable` annotation may be +added to the Processor's class. In this case, the requirement +is changed so that only one downstream destination must be "available" +(a destination is considered "available" if the Connection's +queue is not full), rather than requiring that all downstream +destinations be available. + +Also related to Processor scheduling is the `@TriggerSerially` +annotation. Processors that use this Annotation will never have more +than one thread running the `onTrigger` method simultaneously. It is +crucial to note, though, that the thread executing the code +may change from invocation to invocation. Therefore, care must still +be taken to ensure that the Processor is thread-safe! + + + +=== Component Lifecycle + +The NiFi API provides lifecycle support through use of Java +Annotations. The `org.apache.nifi.annotations.lifecycle` package +contains +several annotations for lifecycle management. The following +Annotations may be applied to Java methods in a NiFi component to +indicate to +the framework when the methods should be called. For the discussion of +Component Lifecycle, we will define a NiFi component as a +Processor, ControllerServices, or ReportingTask. + +==== @OnAdded + +The `@OnAdded` annotation causes a method to be invoked as soon as a +component is created. The +component's `initialize` method (or `init` method, if subclasses +`AbstractProcessor`) will be invoked after the component is +constructed, +followed by methods that are annotated with `@OnAdded`. If any method +annotated with `@OnAdded` throws an Exception, an error will +be returned to the user, and that component will not be added to the +flow. Furthermore, other methods with this +Annotation will not be invoked. This method will be called only once +for the lifetime of a component. +Methods with this Annotation must take zero arguments. + + +==== @OnRemoved + +The `@OnRemoved` annotation causes a method to be invoked before a +component is removed from the flow. +This allows resources to be cleaned up before removing a component. +Methods with this annotation must take zero arguments. +If a method with this annotation throws an Exception, the component +will still be removed. + +==== @OnScheduled + +This annotation indicates that a method should be called every time +the component is scheduled to run. Because ControllerServices +are not scheduled, using this annotation on a ControllerService does +not make sense and will not be honored. It should be +used only for Processors and Reporting Tasks. If any method with this +annotation throws an Exception, other methods with this +annotation will not be invoked, and a notification will be presented +to the user. In this case, methods annotated with +`@OnUnscheduled` are then triggered, followed by methods with the +`@OnStopped` annotation (during this state, if any of these +methods throws an Exception, those Exceptions are ignored). The +component will then yield its execution for some period of time, +referred to as the "Administrative Yield Duration," which is a value +that is configured in the `nifi.properties` file. Finally, the +process will start again, until all of the methods annotated with +`@OnScheduled` have returned without throwing any Exception. +Methods with this annotation may take zero arguments or may take a +single argument. If the single argument variation is used, +the argument must be of type `ProcessContext` if the component is a +Processor or `ConfigurationContext` if the component +is a ReportingTask. + +==== @OnUnscheduled + +Methods with this annotation will be called whenever a Processor or +ReportingTask is no longer scheduled to run. At that time, many threads +may still be active in the Processor's `onTrigger` method. If such a method +throws an Exception, a log message will be generated, and the +Exception will be otherwise +ignored and other methods with this annotation will still be invoked. +Methods with this annotation may take zero arguments or may take a +single argument. +If the single argument variation is used, the argument must be of type +`ProcessContext` if the component is a Processor or +`ConfigurationContext` if the +component is a ReportingTask. + + +==== @OnStopped + +Methods with this annotation will be called when a Processor or +ReportingTask is no longer scheduled to run +and all threads have returned from the `onTrigger` method. If such a +method throws an Exception, +a log message will be generated, and the Exception will otherwise be +ignored; other methods with +this annotation will still be invoked. Methods with this annotation +must take zero arguments. + + +==== @OnShutdown + +Any method that is annotated with the `@OnShutdown` annotation will be +called when NiFi is successfully +shut down. If such a method throws an Exception, a log message will be +generated, and the +Exception will be otherwise ignored and other methods with this +annotation will still be invoked. +Methods with this annotation must take zero arguments. Note: while +NiFi will attempt to invoke methods +with this annotation on all components that use it, this is not always +possible. For example, the process +may be killed unexpectedly, in which case it does not have a chance to +invoke these methods. Therefore, +while methods using this annotation can be used to clean up resources, +for instance, they should not be +relied upon for critical business logic. + + + + +=== Reporting Processor Activity + +Processors are responsible for reporting their activity so that users +are able to understand what happens +to their data. Processors should log events via the ProcessorLog, +which is accessible via the InitializationContext +or by calling the `getLogger` method of `AbstractProcessor`. + +Additionally, Processors should use the `ProvenanceReporter` +interface, obtained via the ProcessSession's +`getProvenanceReporter` method. The ProvenanceReoprter should be used +to indicate any time that content is +received from an external source or sent to an external location. The +ProvenanceReporter also has methods for +reporting when a FlowFile is cloned, forked, or modified, and when +multiple FlowFiles are merged into a single FlowFile +as well as associating a FlowFile with some other identifier. However, +these functions are less critical to report, as +the framework is able to detect these things and emit appropriate +events on the Processor's behalf. Yet, it is a best practice +for the Processor developer to emit these events, as it becomes +explicit in the code that these events are being emitted, and +the developer is able to provide additional details to the events, +such as the amount of time that the action took or +pertinent information about the action that was taken. If the +Processor emits an event, the framework will not emit a duplicate +event. Instead, it always assumes that the Processor developer knows +what is happening in the context of the Processor +better than the framework does. The framework may, however, emit a +different event. For example, if a Processor modifies both the +content of a FlowFile and its attributes and then emits only an +ATTRIBUTES_MODIFIED event, the framework will emit a CONTENT_MODIFIED +event. The framework will not emit an ATTRIBUTES_MODIFIED event if any +other event is emitted for that FlowFile (either by the +Processor or the framework). This is due to the fact that all +Provenance Events know about the attributes of the FlowFile before the +event occurred as well as those attributes that occurred as a result +of the processing of that FlowFile, and as a result the +ATTRIBUTES_MODIFIED is generally considered redundant and would result +in a rendering of the FlowFile lineage being very verbose. +It is, however, acceptable for a Processor to emit this event along +with others, if the event is considered pertinent from the +perspective of the Processor. + + + + + + + +== Documenting a Component + +NiFi attempts to make the user experience as simple and convenient as +possible by providing significant amount of documentation +to the user from within the NiFi application itself via the User +Interface. In order for this to happen, of course, Processor +developers must provide that documentation to the framework. NiFi +exposes a few different mechanisms for supplying documentation to +the framework. + + +=== Documenting Properties + +Individual properties can be documented by calling the `description` +method of a PropertyDescriptor's builder as such: + +[source,java] +---- +public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor.Builder() + .name("My Property") + .description("Description of the Property") + ... + .build(); +---- + +If the property is to provide a set of allowable values, those values +are presented to the user in a drop-down field in the UI. +Each of those values can also be given a description: + +[source,java] +---- +public static final AllowableValue EXTENSIVE = new AllowableValue("Extensive", "Extensive", + "Everything will be logged - use with caution!"); +public static final AllowableValue VERBOSE = new AllowableValue("Verbose", "Verbose", + "Quite a bit of logging will occur"); +public static final AllowableValue REGULAR = new AllowableValue("Regular", "Regular", + "Typical logging will occur"); + +public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder() + .name("Amount to Log") + .description("How much the Processor should log") + .allowableValues(REGULAR, VERBOSE, EXTENSIVE) + .defaultValue(REGULAR.getValue()) + ... + .build(); +---- + + +=== Documenting Relationships + +Processor Relationships are documented in much the same way that +properties are - by calling the `description` method of a +Relationship's builder: + +[source,java] +---- +public static final Relationship MY_RELATIONSHIP = new Relationship.Builder() + .name("My Relationship") + .description("This relationship is used only if the Processor fails to process the data.") + .build(); +---- + + +=== Documenting Capability and Keywords + +The `org.apache.nifi.annotations.documentation` package provides Java +annotations that can be used to document components. The +CapabilityDescription +annotation can be added to a Processor, Reporting Task, or Controller +Service and is intended to provide a brief description of the +functionality +provided by the component. The Tags annotation has a `value` variable +that is defined to be an Array of Strings. As such, it is used +by providing multiple values as a comma-separated list of ++String++s +with curly braces. These values are then incorporated into the UI by +allowing +users to filter the components based on a tag (i.e., a keyword). +Additionally, the UI provides a tag cloud that allows users to select +the tags that +they want to filter by. The tags that are largest in the cloud are +those tags that exist the most on the components in that instance of +NiFi. An +example of using these annotations is provided below: + +[source, java] +---- +@Tags({"example", "documentation", "developer guide", "processor", "tags"}) +@CapabilityDescription("Example Processor that provides no real functionality but is provided" + + " for an example in the Developer Guide") +public static final ExampleProcessor extends Processor { + ... +} +---- + +=== Documenting FlowFile Attribute Interaction + +Many times a processor will expect certain FlowFile attributes be set on in-bound FlowFiles in order +for the processor to function properly. In other cases a processor may update or +create FlowFile attributes on the out-bound FlowFile. Processor developers may document both of these +behaviors using the `ReadsAttribute` and `WritesAttribute` documentation annotations. These attributes are used to generate documentation +that gives users a better understanding of how a processor will interact with the flow. + +Note: Because Java 7 does not support +repeated annotations on a type, you may need to use `ReadsAttributes` and `WritesAttributes` to indicate +that a processor reads or writes multiple FlowFile attributes. This annotation can only be applied to Processors. An example is listed below: + +[source, java] +---- +@WritesAttributes({ @WritesAttribute(attribute = "invokehttp.status.code", description = "The status code that is returned"), + @WritesAttribute(attribute = "invokehttp.status.message", description = "The status message that is returned"), + @WritesAttribute(attribute = "invokehttp.response.body", description = "The response body"), + @WritesAttribute(attribute = "invokehttp.request.url", description = "The request URL"), + @WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"), + @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server") }) +public final class InvokeHTTP extends AbstractProcessor { +---- + +=== Documenting Related Components +Often Processors and ControllerServices are related to one another. Sometimes its a put/get relation as in `PutFile` and `GetFile`. +Sometimes a Processor uses a ControllerService like `InvokeHTTP` and `StandardSSLContextService`. Sometimes one ControllerService uses another +like `DistributedMapCacheClientService` and `DistributedMapCacheServer`. Developers of these extension points may relate these +different components using the `SeeAlso` tag. This annotation links these components in the documentation. +`SeeAlso` can be applied to Processors, ControllerServices and ReportingTasks. An example of how to do this is listed below: + +[source, java] +---- +@SeeAlso(GetFile.class) +public class PutFile extends AbstractProcessor { +---- + +=== Advanced Documentation + +When the documentation methods above are not sufficient, NiFi provides +the ability to expose more advanced documentation to the user via the +"Usage" documentation. When a user right-clicks on a Processor, NiFi +provides a "Usage" menu item in the context menu. Additionally, the +UI exposes a "Help" link in the top-right corner, from which the same +Usage information can be found. + +The advanced documentation of a Processor is provided as an HTML file named `additionalDetails.html`. +This file should exist within a directory whose name is the +fully-qualified +name of the Processor, and this directory's parent should be named +`docs` and exist in the root of the Processor's jar. +This file will be linked from a generated HTML file that will contain +all the Capability, Keyword, PropertyDescription and Relationship information, +so it will not be necessary to duplicate that. This is a place +to provide a rich explanation of what this Processor is doing, what kind of +data it expects and produces, and what FlowFile attributes it expects and produces. +Because this documentation is in an HTML format, you may include images and tables +to best describe this component. The same methods can be used to provide advanced +documentation for Processors, ControllerServices and ReportingTasks. + + + +== Common Processor Patterns + +While there are many different Processors available to NiFi users, the +vast majority of them fall into +one of several common design patterns. Below, we discuss these +patterns, when the patterns are appropriate, +reasons we follow these patterns, and things to watch out for when +applying such patterns. Note that the patterns +and recommendations discussed below are general guidelines and not +hardened rules. + + +[[ingress]] +=== Data Ingress + +A Processor that ingests data into NiFi has a single Relationship +names `success`. This Processor generates +new FlowFiles via the ProcessSession `create` method and does not pull +FlowFiles from incoming Connections. +The Processor name starts with "Get" or "Listen," depending on whether +it polls an external source or exposes +some interface to which external sources can connect. The name ends +with the protocol used for communications. +Processors that follow this pattern include `GetFile`, `GetSFTP`, +`ListenHTTP`, and `GetHTTP`. + +This Processor may create or initialize a Connection Pool in a method +that uses the `@OnScheduled` annotation. +However, because communications problems may prevent connections from +being established or cause connections +to be terminated, connections themselves are not created at this +point. Rather, the connections are +created or leased from the pool in the `onTrigger` method. + +The `onTrigger` method of this Processor begins by leasing a +connection from the Connection Pool, if possible, +or otherwise creates a connection to the external service. When no +data is available from the +external source, the `yield` method of the ProcessContext is called by +the Processor and the method returns so +that this Processor avoids continually running and depleting resources +without benefit. Otherwise, this +Processor then creates a FlowFile via the ProcessSession's `create` +method and assigns an appropriate +filename and path to the FlowFile (by adding the `filename` and `path` +attributes), as well as any other +attributes that may be appropriate. An OutputStream to the FlowFile's content is +obtained via the ProcessSession's `write` method, passing a new +OutputStreamCallback (which is usually +an anonymous inner class). From within this callback, the Processor is +able to write to the FlowFile and streams +the content from the external resource to the FlowFile's OutputStream. +If the desire is to write the entire contents +of an InputStream to the FlowFile, the `importFrom` method of +ProcessSession may be more convenient to use than the +`write` method. + +When this Processor expects to receive many small files, it may be +advisable to create several FlowFiles from a +single session before committing the session. Typically, this allows +the Framework to treat the content of the +newly created FlowFiles much more efficiently. + +This Processor generates a Provenance event indicating that it has +received data and specifies from +where the data came. This Processor should log the creation of the +FlowFile so that the FlowFile's +origin can be determined by analyzing logs, if necessary. + +This Processor acknowledges receipt of the data and/or removes the +data from the external source in order +to prevent receipt of duplicate files. *This is done only after the +ProcessSession by which the FlowFile was +created has been committed!* Failure to adhere to this principle may +result in data loss, as restarting NiFi +before the session has been committed will result in the temporary +file being deleted. Note, however, that it +is possible using this approach to receive duplicate data because the +application could be restarted after +committing the session and before acknowledging or removing the data +from the external source. In general, though, +potential data duplication is preferred over potential data loss. The +connection is finally returned or added to +the Connection Pool, depending on whether the connection was leased +from the Connection Pool to begin with or +was created in the `onTrigger` method. + +If there is a communications problem, the connection is typically +terminated and not returned (or added) to +the Connection Pool. Connections to remote systems are torn down and +the Connection Pool shutdown in a method +annotated with the `@OnStopped` annotation so that resources can be reclaimed. + + +=== Data Egress + +A Processor that publishes data to an external source has two +Relationships: `success` and `failure`. The +Processor name starts with "Put" followed by the protocol that is used +for data transmission. Processors +that follow this pattern include `PutEmail`, `PutSFTP`, and +`PostHTTP` (note that the name does not +begin with "Put" because this would lead to confusion, since PUT and +POST have special meanings when dealing with +HTTP). + +This Processor may create or initialize a Connection Pool in a method +that uses the `@OnScheduled` annotation. +However, because communications problems may prevent connections from +being established or cause connections +to be terminated, connections themselves are not created at this +point. Rather, the connections are +created or leased from the pool in the `onTrigger` method. + +The `onTrigger` method first obtains a FlowFile from the +ProcessSession via the `get` method. If no FlowFile is +available, the method returns without obtaining a connection to the +remote resource. + +If at least one FlowFile is available, the Processor obtains a +connection from the Connection Pool, if possible, +or otherwise creates a new connection. If the Processor is neither +able to lease a connection from the Connection Pool +nor create a new connection, the FlowFile is routed to `failure`, the +event is logged, and the method returns. + +If a connection was obtained, the Processor obtains an InputStream to +the FlowFile's content by invoking the +`read` method on the ProcessSession and passing an InputStreamCallback +(which is often an anonymous inner class) +and from within that callback transmits the contents of the FlowFile +to the destination. The event is logged +along with the amount of time taken to transfer the file and the data +rate at which the file was transferred. +A SEND event is reported to the ProvenanceReporter by obtaining the +reporter from the ProcessSession via the +`getProvenanceReporter` method and calling the `send` method on the +reporter. The connection is returned or added +to the Connection Pool, depending on whether the connection was leased +from the pool or newly created by the +`onTrigger` method. + +If there is a communications problem, the connection is typically +terminated and not returned (or added) to +the Connection Pool. If there is an issue sending the data to the +remote resource, the desired approach for handling the +error depends on a few considerations. If the issue is related to a +network condition, the FlowFile is generally +routed to `failure`. The FlowFile is not penalized because there is +not necessary a problem with the data. Unlike the +case of the <<ingress>> Processor, we typically do not call `yield` on +the ProcessContext. This is because in the case of +ingest, the FlowFile does not exist until the Processor is able to +perform its function. However, in the case of a Put Processor, +the DataFlow Manager may choose to route `failure` to a different +Processor. This can allow for a "backup" system to be +used in the case of problems with one system or can be used for load +distribution across many systems. + +If a problem occurs that is data-related, one of two approaches should +be taken. First, if the problem is likely to +sort itself out, the FlowFile is penalized and then routed to +`failure`. This is the case, for instance, with PutFTP, +when a FlowFile cannot be transferred because of a file naming +conflict. The presumption is that the file will eventually +be removed from the directory so that the new file can be transferred. +As a result, we penalize the FlowFile and route to +`failure` so that we can try again later. In the other case, if there +is an actual problem with the data (such as the data does +not conform to some required specification), a different approach may +be taken. In this case, it may be advantageous +to break apart the `failure` relationship into a `failure` and a +`communications failure` relationship. This allows the +DataFlow Manager to determine how to handle each of these cases +individually. It is important in these situations to document +well the differences between the two Relationships by clarifying it in +the "description" when creating the Relationship. + +Connections to remote systems are torn down and the Connection Pool +shutdown in a method +annotated with `@OnStopped` so that resources can be reclaimed. + + +=== Route Based on Content (One-to-One) + +A Processor that routes data based on its content will take one of two +forms: Route an incoming FlowFile to exactly +one destination, or route incoming data to 0 or more destinations. +Here, we will discuss the first case. + +This Processor has two relationships: `matched` and `unmatched`. If a +particular data format is expected, the Processor +will also have a `failure` relationship that is used when the input is +not of the expected format. The Processor exposes +a Property that indicates the routing criteria. + +If the Property that specifies routing criteria requires processing, +such as compiling a Regular Expression, this processing +is done in a method annotated with `@OnScheduled`, if possible. The +result is then stored in a member variable that is marked +as `volatile`. + +The `onTrigger` method obtains a single FlowFile. The method reads the +contents of the FlowFile via the ProcessSession's `read` +method, evaluating the Match Criteria as the data is streamed. The +Processor then determines whether the FlowFile should be +routed to `matched` or `unmatched` based on whether or not the +criteria matched, and routes the FlowFile to the appropriate +relationship. + +The Processor then emits a Provenance ROUTE event indicating which +Relationship to which the Processor routed the FlowFile. + +This Processor is annotated with the `@SideEffectFree` and +`@SupportsBatching` annotations from the `org.apache.nifi.annotations.behavior` +package. + + +=== Route Based on Content (One-to-Many) + +If a Processor will route a single FlowFile to potentially many +relationships, this Processor will be slightly different than +the above-described Processor for Routing Data Based on Content. This +Processor typically has Relationships that are dynamically +defined by the user as well as an `unmatched` relationship. + +In order for the user to be able to define additionally Properties, +the `getSupportedDynamicPropertyDescriptor` method must be +overridden. This method returns a PropertyDescriptor with the supplied +name and an applicable Validator to ensure that the +user-specified Matching Criteria is valid. + +In this Processor, the Set of Relationships that is returned by the +`getRelationships` method is a member variable that is +marked `volatile`. This Set is initially constructed with a single +Relationship named `unmatched`. The `onPropertyModified` method +is overridden so that when a Property is added or removed, a new +Relationship is created with the same name. If the Processor has +Properties that are not user-defined, it is important to check if the +specified Property is user-defined. This can be achieved by +calling the `isDynamic` method of the PropertyDescriptor that is +passed to this method. If this Property is dynamic, +a new Set of Relationships is then created, and the previous set of +Relationships is copied into it. This new Set +either has the newly created Relationship added to it or removed from +it, depending on whether a new Property was added +to the Processor or a Property was removed (Property removal is +detected by check if the third argument to this function is `null`). +The member variable holding the Set of Relationships is then updated +to point to this new Set. + +If the Properties that specify routing criteria require processing, +such as compiling a Regular Expression, this processing is done +in a method annotated with `@OnScheduled`, if possible. The result is +then stored in a member variable that is marked as `volatile`. +This member variable is generally of type `Map` where the key is of +type `Relationship` and the value's type is defined by the result of +processing the property value. + +The `onTrigger` method obtains a FlowFile via the `get` method of +ProcessSession. If no FlowFile is available, it returns immediately. +Otherwise, a Set of type Relationship is created. The method reads the +contents of the FlowFile via the ProcessSession's `read` method, +evaluating each of the Match Criteria as the data is streamed. For any +criteria that matches, the relationship associated with that Match +Criteria is added to the Set of Relationships. + +After reading the contents of the FlowFile, the method checks if the +Set of Relationships is empty. If so, the original FlowFile has +an attribute added to it to indicate the Relationship to which it was +routed and is routed to the `unmatched`. This is logged, a +Provenance ROUTE event is emitted, and the method returns. If the size +of the Set is equal to 1, the original FlowFile has an attribute +added to it to indicate the Relationship to which it was routed and +is routed to the Relationship specified by the entry in the Set. +This is logged, a Provenance ROUTE event is emitted for the FlowFile, +and the method returns. + +In the event that the Set contains more than 1 Relationship, the +Processor creates a clone of the FlowFile for each Relationship, +except +for the first. This is done via the `clone` method of the +ProcessSession. There is no need to report a CLONE Provenance Event, +as the +framework will handle this for you. The original FlowFile and each +clone are routed to their appropriate Relationship with attribute +indicating the name of the Relationship. A Provenance ROUTE event is +emitted for each FlowFile. This is logged, and the method returns. + +This Processor is annotated with the `@SideEffectFree` and +`@SupportsBatching` annotations from the +`org.apache.nifi.annotations.behavior` +package. + + +=== Route Streams Based on Content (One-to-Many) + +The previous description of Route Based on Content (One-to-Many) +provides an abstraction +for creating a very powerful Processor. However, it assumes that each +FlowFile will be routed +in its entirety to zero or more Relationships. What if the incoming +data format is a "stream" of +many different pieces of information - and we want to send different +pieces of this stream to +different Relationships? For example, imagine that we want to have a +RouteCSV Processor such that +it is configured with multiple Regular Expressions. If a line in the +CSV file matches a Regular +Expression, that line should be included in the outbound FlowFile to +the associated relationship. +If a Regular Expression is associated with the Relationship +"has-apples" and that Regular Expression +matches 1,000 of the lines in the FlowFile, there should be one outbound +FlowFile for the "has-apples" relationship that has 1,000 lines in it. +If a different Regular Expression +is associated with the Relationship "has-oranges" and that Regular +Expression matches 50 lines in the +FlowFile, there should be one outbound FlowFile for the "has-oranges" +relationship that has 50 lines in it. +I.e., one FlowFile comes in and two FlowFiles come out. The two +FlowFiles may contain some of the same lines +of text from the original FlowFile, or they may be entirely different. +This is the type of Processor that +we will discuss in this section. + +This Processor's name starts with "Route" and ends with the name of +the data type that it routes. In our +example here, we are routing CSV data, so the Processor is named +RouteCSV. This Processor supports dynamic +properties. Each user-defined property has a name that maps to the +name of a Relationship. The value of +the Property is in the format necessary for the "Match Criteria." In +our example, the value of the property +must be a valid Regular Expression. + +This Processor maintains an internal `ConcurrentMap` where the key is +a `Relationship` and the value is of +a type dependent on the format of the Match Criteria. In our example, +we would maintain a +`ConcurrentMap<Relationship, Pattern>`. This Processor overrides the +`onPropertyModified` method. +If the new value supplied to this method (the third argument) is null, +the Relationship whose name is +defined by the property name (the first argument) is removed from the +ConcurrentMap. Otherwise, the new value +is processed (in our example, by calling `Pattern.compile(newValue)`) +and this value is added to the ConcurrentMap +with the key again being the Relationship whose name is specified by +the property name. + +This Processor will override the `customValidate` method. In this +method, it will retrieve all Properties from +the `ValidationContext` and count the number of PropertyDescriptors +that are dynamic (by calling `isDynamic()` +on the PropertyDescriptor). If the number of dynamic +PropertyDescriptors is 0, this indicates that the user +has not added any Relationships, so the Processor returns a +`ValidationResult` indicating that the Processor +is not valid because it has no Relationships added. + +The Processor returns all of the Relationships specified by the user +when its `getRelationships` method is +called and will also return an `unmatched` Relationship. Because this +Processor will have to read and write to the +Content Repository (which can be relatively expensive), if this +Processor is expected to be used for very high +data volumes, it may be advantageous to add a Property that allows the +user to specify whether or not they care +about the data that does not match any of the Match Criteria. + +When the `onTrigger` method is called, the Processor obtains a +FlowFile via `ProcessSession.get`. If no data +is available, the Processor returns. Otherwise, the Processor creates +a `Map<Relationship, FlowFile>`. We will +refer to this Map as `flowFileMap`. The Processor reads the incoming +FlowFile by calling `ProcessSession.read` +and provides an `InputStreamCallback`. +From within the Callback, the Processor reads the first piece of data +from the FlowFile. The Processor then +evaluates each of the Match Criteria against this piece of data. If a +particular criteria (in our example, +a Regular Expression) matches, the Processor obtains the FlowFile from +`flowFileMap` that belongs to the appropriate +Relationship. If no FlowFile yet exists in the Map for this +Relationship, the Processor creates a new FlowFile +by calling `session.create(incomingFlowFile)` and then adds the new +FlowFile to `flowFileMap`. The Processor then +writes this piece of data to the FlowFile by calling `session.append` +with an `OutputStreamCallback`. From within +this OutputStreamCallback, we have access to the new FlowFile's +OutputStream, so we are able to write the data +to the new FlowFile. We then return from the OutputStreamCallback. +After iterating over each of the Match Criteria, +if none of them match, we perform the same routines as above for the +`unmatched` relationship (unless the user +configures us to not write out unmatched data). Now that we have +called `session.append`, we have a new version of +the FlowFile. As a result, we need to update our `flowFileMap` to +associate the Relationship with the new FlowFile. + +If at any point, an Exception is thrown, we will need to route the +incoming FlowFile to `failure`. We will also +need to remove each of the newly created FlowFiles, as we won't be +transferring them anywhere. We can accomplish +this by calling `session.remove(flowFileMap.values())`. At this point, +we will log the error and return. + +Otherwise, if all is successful, we can now iterate through the +`flowFileMap` and transfer each FlowFile to the +corresponding Relationship. The original FlowFile is then either +removed or routed to an `original` relationship. +For each of the newly created FlowFiles, we also emit a Provenance +ROUTE event indicating which Relationship +the FlowFile went to. It is also helpful to include in the details of +the ROUTE event how many pieces of information +were included in this FlowFile. This allows DataFlow Managers to +easily see when looking at the Provenance +Lineage view how many pieces of information went to each of the +relationships for a given input FlowFile. + +Additionally, some Processors may need to "group" the data that is +sent to each Relationship so that each FlowFile +that is sent to a relationship has the same value. In our example, we +may wan to allow the Regular Expression +to have a Capturing Group and if two different lines in the CSV match +the Regular Expression but have different +values for the Capturing Group, we want them to be added to two +different FlowFiles. The matching value could then +be added to each FlowFile as an Attribute. This can be accomplished by +modifying the `flowFileMap` such that +it is defined as `Map<Relationship, Map<T, FlowFile>>` where `T` is +the type of the Grouping Function (in our +example, the Group would be a `String` because it is the result of +evaluating a Regular Expression's +Capturing Group). + + + +=== Route Based on Attributes + +This Processor is almost identical to the Route Data Based on Content +Processors described above. It takes two different forms: One-to-One +and +One-to-Many, as do the Content-Based Routing Processors. This +Processor, however, does not make any call to ProcessSession's `read` +method, +as it does not read FlowFile content. This Processor is typically very +fast, so the `@SupportsBatching` annotation can be very important +in this case. + + + +=== Split Content (One-to-Many) + +This Processor generally requires no user configuration, with the +exception of the size of each Split to create. The `onTrigger` method +obtains +a FlowFile from its input queues. A List of type FlowFile is created. +The original FlowFile is read via the ProcessSession's `read` method, +and an InputStreamCallback is used. Within the InputStreamCallback, +the content is read until a point is reached at which the FlowFile +should be +split. If no split is needed, the Callback returns, and the original +FlowFile is routed to `success`. In this case, a Provenance ROUTE +event +is emitted. Typically, ROUTE events are not emitted when routing a +FlowFile to `success` because this generates a very verbose lineage +that +becomes difficult to navigate. However, in this case,the event is +useful because we would otherwise expect a FORK event and the absence +of +any event is likely to cause confusion. The fact that the FlowFile was +not split but was instead transferred to `success` is logged, and the +method returns. + +If a point is reached at which a FlowFile needs to be split, a new +FlowFile is created via the ProcessSession's `create(FlowFile)` method +or the +`clone(FlowFile, long, long)` method. The next section of code depends +on whether the `create` method is used or the `clone` method is used. +Both methods are described below. Which solution is appropriate must +be determined on a case-by-case basis. + +The Create Method is most appropriate when the data will not be +directly copied from the original FlowFile to the new FlowFile. +For example, if only some of the data will be copied, or if the data +will be modified in some way before being copied to the new +FlowFile, this method is necessary. However, if the content of the new +FlowFile will be an exact copy of a portion of the original +FlowFile, the Clone Method is much preferred. + +*Create Method* +If using the `create` method, the method is called with the original +FlowFile as the argument so that the newly created FlowFile will +inherit +the attributes of the original FlowFile and a Provenance FORK event +will be created by the framework. + +The code then enters a `try/finally` block. Within the `finally` +block, the newly created FlowFile is added to the List of FlowFiles +that have +been created. This is done within a `finally` block so that if an +Exception is thrown, the newly created FlowFile will be appropriately +cleaned up. +Within the `try` block, the callback initiates a new callback by +calling the ProcessSession's `write` method with an +OutputStreamCallback. +The appropriate data is then copied from the InputStream of the +original FlowFile to the OutputStream for the new FlowFile. + +*Clone Method* +If the content of the newly created created FlowFile is to be only a +contiguous subset of the bytes of the original FlowFile, it is +preferred +to use the `clone(FlowFile, long, long)` method instead of the +`create(FlowFile)` method of the ProcessSession. In this case, the +offset +of the original FlwoFile at which the new FlowFile's content should +begin is passed as the second argument to the `clone` method. The +length +of the new FlowFile is passed as the third argument to the `clone` +method. For example, if the original FlowFile was 10,000 bytes +and we called `clone(flowFile, 500, 100)`, the FlowFile that would be +returned to us would be identical to `flowFile` with respect to its +attributes. However, the content of the newly created FlowFile would +be 100 bytes in length and would start at offset 500 of the original +FlowFile. That is, the contents of the newly created FlowFile would be +the same as if you had copied bytes 500 through 599 of the original +FlowFile. + +After the clone has been created, it is added to the List of FlowFiles. + +This method is much more highly preferred than the Create method, when +applicable, +because no disk I/O is required. The framework is able to simply +create a new FlowFile +that references a subset of the original FlowFile's content, rather +than actually copying +the data. However, this is not always possible. For example, if header +information must be copied +from the beginning of the original FlowFile and added to the beginning +of each Split, +then this method is not possible. + + +*Both Methods* +Regardless of whether the Clone Method or the Create Method is used, +the following is applicable: + +If at any point in the InputStreamCallback, a condition is reached in +which processing cannot continue +(for example, the input is malformed), a `ProcessException` should be +thrown. The call to the +ProcesssSession's `read` method is wrapped in a `try/catch` block +where `ProcessException` is +caught. If an Exception is caught, a log message is generated +explaining the error. The List of +newly created FlowFiles is removed via the ProcessSession's `remove` +method. The original FlowFile +is routed to `failure`. + +If no problems arise, the original FlowFile is routed to `original` +and all newly created FlowFiles +are updated to include the following attributes: + +[options="header"] +|=== +| Attribute Name | Description +| `split.parent.uuid` | The UUID of the original FlowFile +| `split.index` | A one-up number indicating which FlowFile in the list this is (the first FlowFile + created will have a value `0`, the second will have a value `1`, etc.) +| `split.count` | The total number of split FlowFiles that were created +|=== + +The newly created FlowFiles are routed to `success`; this event is +logged; and the method returns. + + +=== Update Attributes Based on Content + +This Processor is very similar to the Route Based on Content +Processors discussed above. Rather than +routing a FlowFile to `matched` or `unmatched`, the FlowFile is +generally routed to `success` or `failure` +and attributes are added to the FlowFile as appropriate. The +attributes to be added are configured in a +manner similar to that of the Route Based on Content (One-to-Many), +with the user defining their own +properties. The name of the property indicates the name of an +attribute to add. The value of the +property indicates some Matching Criteria to be applied to the data. +If the Matching Criteria matches +the data, an attribute is added with the name the same as that of the +Property. The value of the +attribute is the criteria from the content that matched. + +For example, a Processor that evaluates XPath Expressions may allow +user-defined XPaths to be +entered. If the XPath matches the content of a FlowFile, that FlowFile +will have an attribute added with +the name being equal to that of the Property name and a value equal to +the textual content of the XML Element or +Attribute that matched the XPath. The `failure` relationship would +then be used if the incoming FlowFile +was not valid XML in this example. The `success` relationship would be +used regardless of whether or not +any matches were found. This can then be used to route the FlowFile +when appropriate. + +This Processor emits a Provenance Event of type ATTRIBUTES_MODIFIED. + + +=== Enrich/Modify Content + +The Enrich/Modify Content pattern is very common and very generic. +This pattern is responsible for any +general content modification. For the majority of cases, this +Processor is marked with the +`@SideEffectFree` and `@SupportsBatching` annotations. The Processor +has any number of required and optional +Properties, depending on the Processor's function. The Processor +generally has a `success` and `failure` relationship. +The `failure` relationship is generally used when the input file is +not in the expected format. + +This Processor obtains a FlowFile and updates it using the +ProcessSession's `write(StreamCallback)` method +so that it is able to both read from the FlowFile's content and write +to the next version of the FlowFile's +content. If errors are encountered during the callback, the callback +will throw a `ProcessException`. The +call to the ProcessSession's `write` method is wrapped in a +`try/catch` block that catches `ProcessException` +and routes the FlowFile to failure. + +If the callback succeeds, a CONTENT_MODIFIED Provenance Event is emitted. + + + +== Error Handling + +When writing a Processor, there are several different unexpected cases that can occur. +It is important that Processor developers understand the mechanics of how the NiFi framework +behaves if Processors do not handle errors themselves, and it's important to understand +what error handling is expected of Processors. Here, we will discuss how Processors should +handle unexpected errors during the course of their work. + + +=== Exceptions within the Processor + +During the execution of the `onTrigger` method of a Processor, many things can potentially go +awry. Common failure conditions include: + + - Incoming data is not in the expected format. + - Network connections to external services fail. + - Reading or writing data to a disk fails. + - There is a bug in the Processor or a dependent library. + +Any of these conditions can result in an Exception being thrown from the Processor. From the framework +perspective, there are two types of Exceptions that can escape a Processor: `ProcessException` and +all others. + +If a ProcessException is thrown from the Processor, the framework will assume that this is a failure that +is a known outcome. Moreover, it is a condition where attempting to process the data again later may +be successful. As a result, the framework will roll back the session that was being processed and penalize +the FlowFiles that were being processed. + +If any other Exception escapes the Processor, though, the framework will assume that it is a failure that +was not taken into account by the developer. In this case, the framework will also roll back the session +and penalize the FlowFiles. However, in this case, we can get into some very problematic cases. For example, +the Processor may be in a bad state and may continually run, depleting system resources, without providing +any useful work. This is fairly common, for instance, when a NullPointerException is thrown continually. +In order to avoid this case, if an Exception other than ProcessException is able to escape the Processor's +`onTrigger` method, the framework will also "Administratively Yield" the Processor. This means that the +Processor will not be triggered to run again for some amount of time. The amount of time is configured +in the `nifi.properties` file but is 10 seconds by default. + + +=== Exceptions within a callback: IOException, RuntimeException + +More often than not, when an Exception occurs in a Processor, it occurs from within a callback (I.e., +`InputStreamCallback`, `OutputStreamCallback`, or `StreamCallback`). That is, during the processing of a +FlowFile's content. Callbacks are allowed to throw either `RuntimeException` or `IOException`. In the case +of RuntimeException, this Exception will propagate back to the `onTrigger` method. In the case of an +`IOException`, the Exception will be wrapped within a ProcessException and this ProcessException will then +be thrown from the Framework. + +For this reason, it is recommended that Processors that use callbacks do so within a `try/catch` block +and catch `ProcessException` as well as any other `RuntimeException` that they expect their callback to +throw. It is *not* recommended that Processors catch the general `Exception` or `Throwable` cases, however. +This is discouraged for two reasons. + +First, if an unexpected RuntimeException is thrown, it is likely a bug +and allowing the framework to rollback the session will ensure no data loss and ensures that DataFlow Managers +are able to deal with the data as they see fit by keeping the data queued up in place. + +Second, when an IOException is thrown from a callback, there really are two types of IOExceptions: those thrown +from Processor code (for example, the data is not in the expected format or a network connection fails), and +those that are thrown from the Content Repository (where the FlowFile content is stored). If the latter is the case, +the framework will catch this IOException and wrap it into a `FlowFileAccessException`, which extends `RuntimeException`. +This is done explicitly so that the Exception will escape the `onTrigger` method and the framework can handle this +condition appropriately. Catching the general Exception prevents this from happening. + + +=== Penalization vs. Yielding + +When an issue occurs during processing, the framework exposes two methods to allow Processor developers to avoid performing +unnecessary work: "penalization" and "yielding." These two concepts can become confusing for developers new to the NiFi API. +A developer is able to penalize a FlowFile by calling the `penalize(FlowFile)` method of ProcessSession. This causes the +FlowFile itself to be inaccessible to downstream Processors for a period of time. The amount of time that the FlowFile is +inaccessible is determined by the DataFlow Manager by setting the "Penalty Duration" setting in the Processor Configuration +dialog. The default value is 30 seconds. Typically, this is done when a Processor determines that the data cannot be processed +due to environmental reasons that are expected to sort themselves out. A great example of this is the PutSFTP processor, which +will penalize a FlowFile if a file already exists on the SFTP server that has the same filename. In this case, the Processor +penalizes the FlowFile and routes it to failure. A DataFlow Manager can then route failure back to the same PutSFTP Processor. +This way, if a file exists with the same filename, the Processor will not attempt to send the file again for 30 seconds +(or whatever period the DFM has configured the Processor to use). In the meantime, it is able to continue to process other +FlowFiles. + +On the other hand, yielding allows a Processor developer to indicate to the framework that it will not be able to perform +any useful function for some period of time. This commonly happens with a Processor that is communicating with a remote +resource. If the Processor cannot connect to the remote resource, or if the remote resource is expected to provide data +but reports that it has none, the Processor should call `yield` on the `ProcessContext` object and then return. By doing +this, the Processor is telling the framework that it should not waste resources triggering this Processor to run, because +there's nothing that it can do - it's better to use those resources to allow other Processors to run. + + +=== Session Rollback + +Thus far, when we have discussed the `ProcessSession`, we have typically referred to it simply as a mechanism for accessing +FlowFiles. However, it provides another very important capability, which is transactionality. All methods that are called +on a ProcessSession happen as a transaction. When we decided to end the transaction, we can do so either by calling +`commit()` or by calling `rollback()`. Typically, this is handled by the `AbstractProcessor` class: if the `onTrigger` method +throws an Exception, the AbstractProcessor will catch the Exception, call `session.rollback()`, and then re-throw the Exception. +Otherwise, the AbstractProcessor will call `commit()` on the ProcessSession. + +There are times, however, that developers will want to roll back a session explicitly. This can be accomplished at any time +by calling the `rollback()` or `rollback(boolean)` method. If using the latter, the boolean indicates whether or not those +FlowFiles that have been pulled from queues (via the ProcessSession `get` methods) should be penalized before being added +back to their queues. + +When `rollback` is called, any modification that has occurred to the FlowFiles in that session are discarded, to included +both content modification and attribute modification. Additionally, all Provenance Events are rolled back (with the exception +of any SEND event that was emitted by passing a value of `true` for the `force` argument). The FlowFiles that were pulled from +the input queues are then transferred back to the input queues (and optionally penalized) so that they can be processed again. + +On the other hand, when the `commit` method is called, the FlowFile's new state is persisted in the FlowFile Repository, and +any Provenance Events that occurred are persisted in the Provenance Repository. The previous content is destroyed (unless +another FlowFile references the same piece of content), and the FlowFiles are transferred to the outbound queues so that the +next Processors can operate on the data. + +It is also important to note how this behavior is affected by using the `org.apache.nifi.annotations.behavior.SupportsBatching` +annotation. If a Processor utilizes this annotation, calls to `ProcessSession.commit` may not take affect immediately. Rather, +these commits may be batched together in order to provide higher throughput. However, if at any point, the Processor rolls back +the ProcessSession, all changes since the last call to `commit` will be discarded and all "batched" commits will take affect. +These "batched" commits are not rolled back. + + + + +== General Design Considerations + +When designing a Processor, there are a few important design considering to keep in mind. This section of the Developer Guide +brings to the forefront some of the ideas that a developer should be thinking about when creating a Processor. + +=== Consider the User + +One of the most important concepts to keep in mind when developing a Processor (or any other component) is the user +experience that you are creating. It's important to remember that as the developer of such a component, you may have +important knowledge about the context that others do not have. Documentation should always be supplied so that those +less familiar with the process are able to use it with ease. + +When thinking about the user experience, it is also important to note that consistency is very important. It is best +to stick with the standard <<naming-convensions>>. This is true for Processor names, Property names and value, Relationship +names, and any other aspect that the user will experience. + +Simplicity is crucial! Avoid adding properties that you don't expect users to understand or change. As developers, we are +told that hard-coding values is bad. But this sometimes results in developers exposing properties that, when asked for clarification, +tell users to just leave the default value. This leads to confusion and complexity. + + +=== Cohesion and Reusability + +For the sake of making a single, cohesive unit, developers are sometimes tempted to combine several functions into a single Processor. +This is very true for the case when a Processor expects input data to be in format X so that the Processor can convert the data into +format Y and send the newly-formatted data to some external service. + +Taking this approach of formatting the data for a particular endpoint and then sending the data to that endpoint within the same Processor +has several drawbacks: + + - The Processor becomes very complex, as it has to perform the data translation task as well as the task of + sending the data to the remote service. + - If the Processor is unable to communicate with the remote service, it will route the data to a `failure` Relationship. In this case, + the Processor will be responsible to perform the data translation again. And if it fails again, the translation is done yet again. + - If we have five different Processors that translate the incoming data into this new format before sending the data, we have a great + deal of duplicated code. If the schema changes, for instance, many Processors must be updated. + - This intermediate data is thrown away when the Processor finishes sending to the remote service. The intermediate data format + may well be useful to other Processors. + +In order to avoid these issues, and make Processors more reusable, a Processor should always stick to the principal of "do one thing and do +it well." Such a Processor should be broken into two separate Processors: one to convert the data from Format X to Format Y, and another +Processor to send data to the remote resource. + + +[[naming-convensions]] +=== Naming Conventions + +In order to deliver a consistent look and feel to users, it is advisable that Processors keep with standard naming conventions. The following +is a list of standard conventions that are used: + + - Processors that pull data from a remote system are named Get<Service> or Get<Protocol>, depending on if they poll data from arbitrary + sources over a known Protocol (such as GetHTTP or GetFTP) or if they pull data from a known service (such as GetKafka) + - Processors that push data to a remote system are named Put<Service> or Put<Protocol>. + - Relationship names are lower-cased and use spaces to delineated words. + - Property names capitalize significant words, as would be done with the title of a book. + + + +=== Processor Behavior Annotations + +When creating a Processor, the developer is able to provide hints to the framework about how to utilize the Processor most +effectively. This is done by applying annotations to the Processor's class. The annotations that can be applied to a +Processor exist in three sub-packages of `org.apache.nifi.annotations`. Those in the `documentation` sub-package are used +to provide documentation to the user. Those in the `lifecycle` sub-package instruct the framework which methods should +be called on the Processor in order to respond to the appropriate life-cycle events. Those in the `behavior` package +help the framework understand how to interact with the Processor in terms of scheduling and general behavior. + +The following annotations from the `org.apache.nifi.annotations.behavior` package can be used to modify how the framework +will handle your Processor: + + - `EventDriven`: Instructs the framework that the Processor can be scheduled using the Event-Driven scheduling + strategy. This strategy is still experimental at this point, but can result in reduced resource utilization + on dataflows that do not handle extremely high data rates. + + - `SideEffectFree`: Indicates that the Processor does not have any side effects external to NiFi. As a result, the + framework is free to invoke the Processor many times with the same input without causing any unexpected + results to occur. This implies idempotent behavior. This can be used by the framework to improve efficiency by + performing actions such as transferring a ProcessSession from one Processor to another, such that if + a problem occurs many Processors' actions can be rolled back and performed again. + + - `SupportsBatching`: This annotation indicates that it is okay for the framework to batch together multiple + ProcessSession commits into a single commit. If this annotation is present, the user will be able to choose + whether they prefer high throughput or lower latency in the Processor's Scheduling tab. This annotation should + be applied to most Processors, but it comes with a caveat: if the Processor calls `ProcessSession.commit`, + there is no guarantee that the data has been safely stored in NiFi's Content, FlowFile, and Provenance Repositories. + As a result, it is not appropriate for those Processors that receive data from an external source, commit the session, + and then delete the remote data or confirm a transaction with a remote resource. + + - `TriggerSerially`: When this annotation is present, the framework will not allow the user to schedule more than one + concurrent thread to execute the `onTrigger` method at a time. Instead, the number of thread ("Concurrent Tasks") + will always be set to `1`. This does *not*, however, mean that the Processor does not have to be thread-safe, + as the thread that is executing `onTrigger` may change between invocations. + + - `TriggerWhenAnyDestinationAvailable`: By default, NiFi will not schedule a Processor to run if any of its outbound + queues is full. This allows back-pressure to be applied all the way a chain of Processors. However, some Processors + may need to run even if one of the outbound queues is full. This annotations indicates that the Processor should run + if any Relationship is "available." A Relationship is said to be "available" if none of the connections that use + that Relationship is full. For example, the DistributeLoad Processor makes use of this annotation. If the "round robin" + scheduling strategy is used, the Processor will not run if any outbound queue is full. However, if the "next available" + scheduling strategy is used, the Processor will run if any Relationship at all is available and will route FlowFiles + only to those relationships that are available. + + - `TriggerWhenEmpty`: The default behavior is to trigger a Processor to run only if its input queue has at least one + FlowFile or if the Processor has no input queues (which is typical of a "source" Processor). Applying this annotation + will cause the framework to ignore the size of the input queues and trigger the Processor regardless of whether or + not there is any data on an input queue. This is useful, for example, if the Processor needs to be triggered to run + periodically to time out a network connection. + + +=== Data Buffering + +An important point to keep in mind is that NiFi provides a generic data processing capability. Data can be in any format. Processors +are generally scheduled with several threads. A common mistake that developers new to NiFi make is to buffer all the contents of a +FlowFile in memory. While there are cases when this is required, it should be avoided if at all possible, unless it is well-known +what format the data is in. For example, a Processor responsible for executing XPath against an XML document will need to load the +entire contents of the data into memory. This is generally acceptable, as XML is not expected to be extremely large. However, a Processor +that searches for a specific byte sequence may be used to search files that are hundreds of gigabytes or more. Attempting to load this +into memory can cause a lot of problems - especially if multiple threads are processing different FlowFiles simultaneously. + +Instead of buffering this data into memory, it is advisable to instead evaluate the data as it is streamed from the Content Repository +(i.e., scan the content from the `InputStream` that is provided to your callback by `ProcessSession.read`). Of course, in this case, +we don't want to read from the Content Repository for each byte, so we would use a BufferedInputStream or somehow buffer some small +amount of data, as appropriate. + + + + + +[[controller-services]] +== Controller Services + +The `ControllerService` interface allows developers to share +functionality and state across the JVM in a clean +and consistent manner. The interface resembles that of the `Processor` +interface but does not +have an `onTrigger` method because Controller Services are not +scheduled to run periodically, and +Controller Services do not have Relationships because they are not +integrated into the flow directly. Rather, +they are used Processors, Reporting Tasks, and other Controller Services. + +[[developing-controller-service]] +=== Developing a ControllerService + +Just like with the Processor interface, the ControllerService +interface exposes methods for configuration, +validation, and initialization. These methods are all identical to +those of the Processor interface +except that the `initialize` method is passed a +`ControllerServiceInitializationContext`, rather +than a `ProcessorInitializationContext`. + +Controller Services come with an additional constraint that Processors +do not have. A Controller Service +must be comprised of an interface that extends `ControllerService`. +Implementations can then be interacted +with only through their interface. A Processor, for instance, will +never be given a concrete implementation of +a ControllerService and therefore must reference the service only via +interfaces that extends `ControllerService`. + +This constraint is in place mainly because a Processor can exist in +one NiFi Archive (NAR) while the implementation +of the Controller Service that the Processor lives in can exist in a +different NAR. This is accomplished by +the framework by dynamically implementing the exposed interfaces in +such a way that the framework can +switch to the appropriate ClassLoader and invoke the desired method on +the concrete implementation. However, +in order to make this work, the Processor and the Controller Service +implementation must share the same definition +of the Controller Service interface. Therefore, both of these NARs +must depend on the NAR that houses the +Controller Service's interface. See <<nars>> for more information. + + +[[interacting-with-controller-service]] +=== Interacting with a ControllerService + +ControllerServices may be obtained by a Processor, another +ControllerService, or a ReportingTask +by means of the ControllerServiceLookup or by using the +`identifiesControllerService` method of the +PropertyDescriptor's Builder class. The ControllerServiceLookup can be +obtained by a Processor from the +ProcessorInitializationContext that is passed to the `initialize` +method. Likewise, it is obtained by +a ControllerService from the ControllerServiceInitializationContext +and by a ReportingTask via the +ReportingConfiguration object passed to the `initialize` method. + +For most use cases, though, using the `identifiesControllerService` +method of a PropertyDescriptor Builder +is preferred and is the least complicated method. In order to use this +method, we create a PropertyDescriptor +that references a Controller Service as such: + +[source,java] +---- +public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .details("Specified the SSL Context Service that can be used to create secure connections") + .required(true) + .identifiesControllerService(SSLContextService.class) + .build(); +---- + +Using this method, the user will be prompted to supply the SSL Context +Service that should be used. This is +done by providing the user with a drop-down menu from which they are +able to choose any of the SSLContextService +configurations that have been configured, regardless of the implementation. + +In order to make use of this service, the Processor can use code such as: + +[source,java] +---- +final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE) + .asControllerService(SSLContextService.class); +---- + +Note here that `SSLContextService` is an interface that extends +ControllerService. The only implementation +at this time is the `StandardSSLContextService`. However, the +Processor developer need not worry about this +detail. + + + + + + + +== Reporting Tasks + +So far, we have mentioned little about how to convey to the outside +world how NiFi and its components +are performing. Is the system able to keep up with the incoming data +rate? How much more can +the system handle? How much data is processed at the peak time of day +versus the least busy time of day? + +In order to answer these questions, and many more, NiFi provides a +capability for reporting status, +statistics, metrics, and monitoring information to external services +by means of the `ReportingTask` +interface. ReportingTasks are given access to a host of information to +determine how the system is performing. + + +=== Developing a Reporting Task + +Just like with the Processor and ControllerService interfaces, the +ReportingTask interface exposes methods for +configuration, validation, and initialization. These methods are all +identical to those of the +Processor and ControllerService interfaces except that the +`initialize` method is passed a `ReportingConfiguration` +object, as opposed to the initialization objects received by the other +Components. The ReportingTask also has +an `onTrigger` method that is invoked by the framework to trigger the +task to perform its job. + +Within the `onTrigger` method, the ReportingTask is given access to a +ReportingContext, from which configuration +and information about the NiFi instance can be obtained. The +BulletinRepository allows Bulletins to be queried +and allows the ReportingTask to submit its own Bulletins, so that +information will be rendered to users. The +ControllerServiceLookup that is accessible via the Context provides +access to ControllerServices that have been +configured. However, this method of obtaining Controller Services is +not the preferred method. Rather, the +preferred method for obtaining a Controller Service is to reference +the Controller Service in a PropertyDescriptor, +as is discussed in the <<interacting-with-controller-service>> section. + +The `EventAccess` object that is exposed via the ReportingContext +provides access to the `ProcessGroupStatus`, +which exposes statistics about the amount of data processed in the +past five minutes by Process Groups, +Processors, Connections, and other Components. Additionally, the +EventAccess object provides access to +the ++ProvenanceEventRecord++s +that have been stored in the `ProvenanceEventRepository`. These +Provenance Events are emitted by Processors when +data is received from external sources, emitted to external services, +removed from the system, modified, +or routed according to some decision that was made. + +Each ProvenanceEvent has the ID of the FlowFile, the type of Event, +the creation time of the Event, and +all FlowFile attributes associated with the FlowFile at the time that +the FlowFile was accessed by the component +as well as the FlowFile attributes that were associated with the +FlowFile as a result of the processing that the +event describes. This provides a great deal of information to +ReportingTasks, allowing reports to be generated +in many different ways to expose metrics and monitoring capabilities +needed for any number of operational concerns. + + + + + + +== Testing + +Testing the components that will be used within a larger framework can often be very cumbersome +and tricky. With NiFi, we strive to make testing components as easy as possible. In order to do this, +
<TRUNCATED>