NIFI-271 chipping away - more work left in standard bundle
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/54818893 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/54818893 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/54818893 Branch: refs/heads/develop Commit: 548188939c44ed02d8b0ef8e0bbd4513afaade60 Parents: 87e8296 Author: joewitt <joew...@apache.org> Authored: Mon Apr 27 07:44:57 2015 -0400 Committer: joewitt <joew...@apache.org> Committed: Mon Apr 27 07:44:57 2015 -0400 ---------------------------------------------------------------------- .../web/StandardContentViewerController.java | 9 +- .../standard/AbstractJsonPathProcessor.java | 19 +- .../standard/Base64EncodeContent.java | 11 +- .../nifi/processors/standard/BinFiles.java | 170 ++-- .../processors/standard/CompressContent.java | 38 +- .../nifi/processors/standard/ControlRate.java | 39 +- .../standard/ConvertCharacterSet.java | 37 +- .../processors/standard/DetectDuplicate.java | 28 +- .../processors/standard/DistributeLoad.java | 89 +- .../nifi/processors/standard/EncodeContent.java | 27 +- .../processors/standard/EncryptContent.java | 23 +- .../processors/standard/EvaluateJsonPath.java | 91 ++- .../standard/EvaluateRegularExpression.java | 31 +- .../nifi/processors/standard/EvaluateXPath.java | 89 +- .../processors/standard/EvaluateXQuery.java | 63 +- .../processors/standard/ExecuteProcess.java | 19 +- .../standard/ExecuteStreamCommand.java | 71 +- .../nifi/processors/standard/ExtractText.java | 26 +- .../processors/standard/GenerateFlowFile.java | 10 +- .../apache/nifi/processors/standard/GetFTP.java | 14 +- .../nifi/processors/standard/GetFile.java | 32 +- .../processors/standard/GetFileTransfer.java | 9 +- .../nifi/processors/standard/GetHTTP.java | 69 +- .../nifi/processors/standard/GetJMSTopic.java | 50 +- .../nifi/processors/standard/GetSFTP.java | 16 +- .../processors/standard/HandleHttpRequest.java | 468 ++++++----- .../processors/standard/HandleHttpResponse.java | 127 +-- .../nifi/processors/standard/HashAttribute.java | 38 +- .../nifi/processors/standard/HashContent.java | 18 +- .../processors/standard/IdentifyMimeType.java | 17 +- .../nifi/processors/standard/InvokeHTTP.java | 149 ++-- .../nifi/processors/standard/JmsConsumer.java | 100 +-- .../nifi/processors/standard/ListenHTTP.java | 18 +- .../nifi/processors/standard/ListenUDP.java | 133 ++- .../nifi/processors/standard/LogAttribute.java | 5 +- .../nifi/processors/standard/MergeContent.java | 180 ++-- .../nifi/processors/standard/ModifyBytes.java | 3 - .../processors/standard/MonitorActivity.java | 27 +- .../nifi/processors/standard/PostHTTP.java | 152 ++-- .../nifi/processors/standard/PutEmail.java | 423 +++++----- .../apache/nifi/processors/standard/PutFTP.java | 56 +- .../nifi/processors/standard/PutFile.java | 264 +++--- .../processors/standard/PutFileTransfer.java | 127 ++- .../apache/nifi/processors/standard/PutJMS.java | 133 ++- .../nifi/processors/standard/PutSFTP.java | 9 +- .../nifi/processors/standard/ReplaceText.java | 163 ++-- .../standard/ReplaceTextWithMapping.java | 215 +++-- .../processors/standard/RouteOnAttribute.java | 111 ++- .../processors/standard/RouteOnContent.java | 149 ++-- .../nifi/processors/standard/ScanAttribute.java | 115 ++- .../nifi/processors/standard/ScanContent.java | 83 +- .../processors/standard/SegmentContent.java | 75 +- .../nifi/processors/standard/SplitContent.java | 163 ++-- .../nifi/processors/standard/SplitJson.java | 74 +- .../nifi/processors/standard/SplitText.java | 142 ++-- .../nifi/processors/standard/SplitXml.java | 61 +- .../nifi/processors/standard/TransformXml.java | 121 +-- .../nifi/processors/standard/UnpackContent.java | 216 +++-- .../nifi/processors/standard/ValidateXml.java | 45 +- .../servlets/ContentAcknowledgmentServlet.java | 53 +- .../standard/servlets/ListenHTTPServlet.java | 156 ++-- .../nifi/processors/standard/util/Bin.java | 6 +- .../processors/standard/util/BinManager.java | 26 +- .../standard/util/DocumentReaderCallback.java | 3 +- .../processors/standard/util/FTPTransfer.java | 351 +++++--- .../nifi/processors/standard/util/FTPUtils.java | 38 +- .../nifi/processors/standard/util/FileInfo.java | 3 +- .../processors/standard/util/FileTransfer.java | 356 ++++---- .../processors/standard/util/JmsFactory.java | 127 ++- .../standard/util/JmsProcessingSummary.java | 100 ++- .../processors/standard/util/JmsProperties.java | 257 +++--- .../util/JsonPathExpressionValidator.java | 107 ++- .../standard/util/SFTPConnection.java | 9 - .../processors/standard/util/SFTPTransfer.java | 342 +++++--- .../processors/standard/util/SFTPUtils.java | 179 ++-- .../standard/util/UDPStreamConsumer.java | 25 +- .../util/ValidatingBase32InputStream.java | 1 - .../standard/util/WrappedMessageConsumer.java | 9 +- .../standard/util/WrappedMessageProducer.java | 9 +- .../standard/util/XmlSplitterSaxParser.java | 11 +- .../additionalDetails.html | 6 +- .../additionalDetails.html | 2 +- .../org/apache/tika/mime/custom-mimetypes.xml | 144 ++-- .../src/test/java/TestIngestAndUpdate.java | 3 +- .../processors/standard/CaptureServlet.java | 12 +- .../processors/standard/HelloWorldServlet.java | 3 +- .../standard/RESTServiceContentModified.java | 15 +- .../standard/TestBase64EncodeContent.java | 41 +- .../standard/TestCompressContent.java | 96 ++- .../processors/standard/TestControlRate.java | 6 +- .../standard/TestConvertCharacterSet.java | 13 +- .../standard/TestDetectDuplicate.java | 36 +- .../processors/standard/TestDistributeLoad.java | 21 +- .../processors/standard/TestEncodeContent.java | 66 +- .../processors/standard/TestEncryptContent.java | 29 +- .../standard/TestEvaluateJsonPath.java | 222 +++-- .../processors/standard/TestEvaluateXPath.java | 105 ++- .../processors/standard/TestEvaluateXQuery.java | 312 ++++--- .../processors/standard/TestExecuteProcess.java | 36 +- .../standard/TestExecuteStreamCommand.java | 131 ++- .../processors/standard/TestExtractText.java | 104 ++- .../nifi/processors/standard/TestGetFile.java | 63 +- .../nifi/processors/standard/TestGetHTTP.java | 112 +-- .../processors/standard/TestGetJMSQueue.java | 63 +- .../standard/TestHandleHttpRequest.java | 49 +- .../standard/TestHandleHttpResponse.java | 109 ++- .../processors/standard/TestHashAttribute.java | 6 +- .../processors/standard/TestHashContent.java | 4 +- .../standard/TestIdentifyMimeType.java | 15 +- .../processors/standard/TestInvokeHTTP.java | 94 ++- .../processors/standard/TestJmsConsumer.java | 224 ++--- .../nifi/processors/standard/TestListenUDP.java | 38 +- .../processors/standard/TestMergeContent.java | 180 ++-- .../processors/standard/TestModifyBytes.java | 81 +- .../standard/TestMonitorActivity.java | 78 +- .../nifi/processors/standard/TestPostHTTP.java | 189 +++-- .../nifi/processors/standard/TestPutEmail.java | 84 +- .../processors/standard/TestReplaceText.java | 81 +- .../standard/TestReplaceTextLineByLine.java | 203 +++-- .../standard/TestReplaceTextWithMapping.java | 318 +++++--- .../standard/TestRouteOnAttribute.java | 64 +- .../processors/standard/TestRouteOnContent.java | 18 +- .../processors/standard/TestScanAttribute.java | 15 +- .../processors/standard/TestScanContent.java | 34 +- .../processors/standard/TestSegmentContent.java | 13 +- .../nifi/processors/standard/TestServer.java | 84 +- .../processors/standard/TestSplitContent.java | 213 +++-- .../nifi/processors/standard/TestSplitJson.java | 91 ++- .../nifi/processors/standard/TestSplitText.java | 70 +- .../nifi/processors/standard/TestSplitXml.java | 3 +- .../processors/standard/TestTransformXml.java | 40 +- .../processors/standard/TestUnpackContent.java | 129 ++- .../processors/standard/TestValidateXml.java | 3 +- .../test/resources/TestJson/json-sample.json | 814 +++++++++---------- .../ControllerStatusReportingTask.java | 19 +- .../apache/nifi/controller/MonitorMemory.java | 3 +- 136 files changed, 7415 insertions(+), 5020 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-content-viewer/src/main/java/org/apache/nifi/web/StandardContentViewerController.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-content-viewer/src/main/java/org/apache/nifi/web/StandardContentViewerController.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-content-viewer/src/main/java/org/apache/nifi/web/StandardContentViewerController.java index 6ce315e..98f7683 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-content-viewer/src/main/java/org/apache/nifi/web/StandardContentViewerController.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-content-viewer/src/main/java/org/apache/nifi/web/StandardContentViewerController.java @@ -35,9 +35,6 @@ import javax.xml.transform.stream.StreamSource; import org.apache.nifi.web.ViewableContent.DisplayMode; import org.codehaus.jackson.map.ObjectMapper; -/** - * - */ @WebServlet(name = "StandardContentViewer", urlPatterns = {"/view-content"}) public class StandardContentViewerController extends HttpServlet { @@ -51,11 +48,11 @@ public class StandardContentViewerController extends HttpServlet { @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { final ViewableContent content = (ViewableContent) request.getAttribute(ViewableContent.CONTENT_REQUEST_ATTRIBUTE); - + // handle json/xml if ("application/json".equals(content.getContentType()) || "application/xml".equals(content.getContentType()) || "text/plain".equals(content.getContentType())) { final String formatted; - + // leave the content alone if specified if (DisplayMode.Original.equals(content.getDisplayMode())) { formatted = content.getContent(); @@ -90,7 +87,7 @@ public class StandardContentViewerController extends HttpServlet { formatted = content.getContent(); } } - + // defer to the jsp request.setAttribute("mode", content.getContentType()); request.setAttribute("content", formatted); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java index 93b835e..9e77dab 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java @@ -42,15 +42,16 @@ import java.util.Map; import java.util.Objects; /** - * Provides common functionality used for processors interacting and manipulating JSON data via JsonPath. + * Provides common functionality used for processors interacting and + * manipulating JSON data via JsonPath. * * @see <a href="http://json.org">http://json.org</a> - * @see <a href="https://github.com/jayway/JsonPath">https://github.com/jayway/JsonPath</a> + * @see + * <a href="https://github.com/jayway/JsonPath">https://github.com/jayway/JsonPath</a> */ public abstract class AbstractJsonPathProcessor extends AbstractProcessor { - private static final Configuration STRICT_PROVIDER_CONFIGURATION = - Configuration.builder().jsonProvider(new JsonSmartJsonProvider(JSONParser.MODE_RFC4627)).build(); + private static final Configuration STRICT_PROVIDER_CONFIGURATION = Configuration.builder().jsonProvider(new JsonSmartJsonProvider(JSONParser.MODE_RFC4627)).build(); private static final JsonProvider JSON_PROVIDER = STRICT_PROVIDER_CONFIGURATION.jsonProvider(); @@ -89,8 +90,9 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor { } /** - * Determines the context by which JsonSmartJsonProvider would treat the value. {@link java.util.Map} and - * {@link java.util.List} objects can be rendered as JSON elements, everything else is treated as a scalar. + * Determines the context by which JsonSmartJsonProvider would treat the + * value. {@link java.util.Map} and {@link java.util.List} objects can be + * rendered as JSON elements, everything else is treated as a scalar. * * @param obj item to be inspected if it is a scalar or a JSON element * @return false, if the object is a supported type; true otherwise @@ -126,12 +128,11 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor { /** * An optional hook to act on the compute value */ - abstract void cacheComputedValue(String subject, String input, JsonPath computedJsonPath); /** - * A hook for implementing classes to determine if a cached value is stale for a compiled JsonPath represented - * by either a validation + * A hook for implementing classes to determine if a cached value is + * stale for a compiled JsonPath represented by either a validation */ abstract boolean isStale(String subject, String input); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java index fa930d0..d0b78a5 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java @@ -63,9 +63,14 @@ public class Base64EncodeContent extends AbstractProcessor { .allowableValues(ENCODE_MODE, DECODE_MODE) .defaultValue(ENCODE_MODE) .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully encoded or decoded will be routed to success").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that cannot be encoded or decoded will be routed to failure").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully encoded or decoded will be routed to success") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be encoded or decoded will be routed to failure") + .build(); private List<PropertyDescriptor> properties; private Set<Relationship> relationships; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java index 3d7dba1..6452c23 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java @@ -87,18 +87,24 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder() .name("Max Bin Age") - .description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours") + .description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> " + + "where <duration> is a positive integer and time unit is one of seconds, minutes, hours") .required(false) .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) .build(); - public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The FlowFiles that were used to create the bundle").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure").build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The FlowFiles that were used to create the bundle") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure") + .build(); private final BinManager binManager = new BinManager(); private final Queue<Bin> readyBins = new LinkedBlockingQueue<>(); - @OnStopped public final void resetState() { binManager.purge(); @@ -111,80 +117,63 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { } } - - /** - * Allows general pre-processing of a flow file before it is offered to a - * bin. This is called before getGroupId(). - * - * @param context - * @param session - * @param flowFile - * @return The flow file, possibly altered - */ + /** + * Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId(). + * + * @param context context + * @param session session + * @param flowFile flowFile + * @return The flow file, possibly altered + */ protected abstract FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile); - + /** - * Returns a group ID representing a bin. This allows flow files to be - * binned into like groups. - * @param context - * @param flowFile + * Returns a group ID representing a bin. This allows flow files to be binned into like groups. + * + * @param context context + * @param flowFile flowFile * @return The appropriate group ID */ protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile); /** - * Performs any additional setup of the bin manager. Called during the - * OnScheduled phase. + * Performs any additional setup of the bin manager. Called during the OnScheduled phase. + * * @param binManager The bin manager - * @param context + * @param context context */ protected abstract void setUpBinManager(BinManager binManager, ProcessContext context); - + /** - * Processes a single bin. Implementing class is responsible for committing - * each session - * - * @param unmodifiableBin - * A reference to a single bin of flow file/session wrappers - * @param binContents - * A copy of the contents of the bin - * @param context - * The context - * @param session - * The session that created the bin - * @param logger - * The logger - * @return Return true if the input bin was already committed. E.g., in case of a - * failure, the implementation may choose to transfer all binned files - * to Failure and commit their sessions. If false, the - * processBins() method will transfer the files to Original and commit - * the sessions - * - * @throws ProcessException if any problem arises while processing a bin - * of FlowFiles. All flow files in the - * bin will be transferred to failure and the ProcessSession provided by - * the 'session' argument rolled back - */ - protected abstract boolean processBin(Bin unmodifiableBin, - List<FlowFileSessionWrapper> binContents, ProcessContext context, ProcessSession session) throws ProcessException; + * Processes a single bin. Implementing class is responsible for committing each session + * + * @param unmodifiableBin A reference to a single bin of flow file/session wrappers + * @param binContents A copy of the contents of the bin + * @param context The context + * @param session The session that created the bin + * @return Return true if the input bin was already committed. E.g., in case of a failure, the implementation may choose to transfer all binned files to Failure and commit their sessions. If + * false, the processBins() method will transfer the files to Original and commit the sessions + * + * @throws ProcessException if any problem arises while processing a bin of FlowFiles. All flow files in the bin will be transferred to failure and the ProcessSession provided by the 'session' + * argument rolled back + */ + protected abstract boolean processBin(Bin unmodifiableBin, List<FlowFileSessionWrapper> binContents, ProcessContext context, ProcessSession session) throws ProcessException; /** - * Allows additional custom validation to be done. This will be called from - * the parent's customValidation method. - * - * @param context - * The context - * @return Validation results indicating problems - */ + * Allows additional custom validation to be done. This will be called from the parent's customValidation method. + * + * @param context The context + * @return Validation results indicating problems + */ protected Collection<ValidationResult> additionalCustomValidation(final ValidationContext context) { - return new ArrayList<ValidationResult>(); + return new ArrayList<>(); } - + @Override public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { int binsAdded = binFlowFiles(context, sessionFactory); - getLogger().debug("Binned {} FlowFiles", new Object[] {binsAdded}); - + getLogger().debug("Binned {} FlowFiles", new Object[]{binsAdded}); + if (!isScheduled()) { return; } @@ -232,11 +221,12 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { final List<FlowFileSessionWrapper> binCopy = new ArrayList<>(bin.getContents()); - boolean binAlreadyCommitted = false; + boolean binAlreadyCommitted = false; try { - binAlreadyCommitted = this.processBin(bin, binCopy, context, session); + binAlreadyCommitted = this.processBin(bin, binCopy, context, session); } catch (final ProcessException e) { - logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e}); + logger. + error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e}); for (final FlowFileSessionWrapper wrapper : binCopy) { wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE); @@ -251,7 +241,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { // across multiple sessions, we cannot guarantee atomicity across the sessions session.commit(); // If this bin's session has been committed, move on. - if ( !binAlreadyCommitted ) { + if (!binAlreadyCommitted) { for (final FlowFileSessionWrapper wrapper : bin.getContents()) { wrapper.getSession().transfer(wrapper.getFlowFile(), REL_ORIGINAL); wrapper.getSession().commit(); @@ -260,8 +250,8 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { return 1; } - - private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) { + + private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) { int binsAdded = 0; while (binManager.getBinCount() < context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) { if (!isScheduled()) { @@ -297,21 +287,22 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { public final void onScheduled(final ProcessContext context) throws IOException { binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue()); - if (context.getProperty(MAX_BIN_AGE).isSet() ) { + if (context.getProperty(MAX_BIN_AGE).isSet()) { binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue()); } else { binManager.setMaxBinAge(Integer.MAX_VALUE); } - - if ( context.getProperty(MAX_SIZE).isSet() ) { - binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue()); + + if (context.getProperty(MAX_SIZE).isSet()) { + binManager.setMaximumSize(context.getProperty(MAX_SIZE). + asDataSize(DataUnit.B).longValue()); } else { binManager.setMaximumSize(Long.MAX_VALUE); } - + binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger()); - if ( context.getProperty(MAX_ENTRIES).isSet() ) { + if (context.getProperty(MAX_ENTRIES).isSet()) { binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue()); } else { binManager.setMaximumEntries(Integer.MAX_VALUE); @@ -319,31 +310,46 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { this.setUpBinManager(binManager, context); } - - @Override + + @Override protected final Collection<ValidationResult> customValidate(final ValidationContext context) { - final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context)); + final List<ValidationResult> problems = new ArrayList<>(super. + customValidate(context)); final long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue(); final Double maxBytes = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B); if (maxBytes != null && maxBytes.longValue() < minBytes) { - problems.add(new ValidationResult.Builder().subject(MIN_SIZE.getName()).input( - context.getProperty(MIN_SIZE).getValue()).valid(false).explanation("Min Size must be less than or equal to Max Size").build()); + problems.add( + new ValidationResult.Builder() + .subject(MIN_SIZE.getName()) + .input(context.getProperty(MIN_SIZE).getValue()) + .valid(false) + .explanation("Min Size must be less than or equal to Max Size") + .build() + ); } - final Long min = context.getProperty(MIN_ENTRIES).asLong(); - final Long max = context.getProperty(MAX_ENTRIES).asLong(); + final Long min = context.getProperty(MIN_ENTRIES). + asLong(); + final Long max = context.getProperty(MAX_ENTRIES). + asLong(); if (min != null && max != null) { if (min > max) { - problems.add(new ValidationResult.Builder().subject(MIN_ENTRIES.getName()).input(context.getProperty(MIN_ENTRIES).getValue()).valid(false).explanation("Min Entries must be less than or equal to Max Entries").build()); + problems.add( + new ValidationResult.Builder().subject(MIN_ENTRIES.getName()) + .input(context.getProperty(MIN_ENTRIES).getValue()) + .valid(false) + .explanation("Min Entries must be less than or equal to Max Entries") + .build() + ); } } - + Collection<ValidationResult> otherProblems = this.additionalCustomValidation(context); if (otherProblems != null) { - problems.addAll(otherProblems); + problems.addAll(otherProblems); } return problems; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java index 5896a62..d9946da 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java @@ -67,9 +67,12 @@ import org.tukaani.xz.XZOutputStream; @SideEffectFree @SupportsBatching @Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2"}) -@CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type attribute as appropriate") -@ReadsAttribute(attribute="mime.type", description="If the Compression Format is set to use mime.type attribute, this attribute is used to determine the compression type. Otherwise, this attribute is ignored.") -@WritesAttribute(attribute="mime.type", description="If the Mode property is set to compress, the appropriate MIME Type is set. If the Mode property is set to decompress and the file is successfully decompressed, this attribute is removed, as the MIME Type is no longer known.") +@CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type " + + "attribute as appropriate") +@ReadsAttribute(attribute = "mime.type", description = "If the Compression Format is set to use mime.type attribute, this attribute is used to " + + "determine the compression type. Otherwise, this attribute is ignored.") +@WritesAttribute(attribute = "mime.type", description = "If the Mode property is set to compress, the appropriate MIME Type is set. If the Mode " + + "property is set to decompress and the file is successfully decompressed, this attribute is removed, as the MIME Type is no longer known.") public class CompressContent extends AbstractProcessor { public static final String COMPRESSION_FORMAT_ATTRIBUTE = "use mime.type attribute"; @@ -90,7 +93,8 @@ public class CompressContent extends AbstractProcessor { .build(); public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder() .name("Compression Level") - .description("The compression level to use; this is valid only when using GZIP compression. A lower value results in faster processing but less compression; a value of 0 indicates no compression but simply archiving") + .description("The compression level to use; this is valid only when using GZIP compression. A lower value results in faster processing " + + "but less compression; a value of 0 indicates no compression but simply archiving") .defaultValue("1") .required(true) .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9") @@ -104,14 +108,21 @@ public class CompressContent extends AbstractProcessor { .build(); public static final PropertyDescriptor UPDATE_FILENAME = new PropertyDescriptor.Builder() .name("Update Filename") - .description("If true, will remove the filename extension when decompressing data (only if the extension indicates the appropriate compression format) and add the appropriate extension when compressing data") + .description("If true, will remove the filename extension when decompressing data (only if the extension indicates the appropriate " + + "compression format) and add the appropriate extension when compressing data") .required(true) .allowableValues("true", "false") .defaultValue("false") .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles will be transferred to the success relationship after successfully being compressed or decompressed").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles will be transferred to the failure relationship if they fail to compress/decompress").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles will be transferred to the success relationship after successfully being compressed or decompressed") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles will be transferred to the failure relationship if they fail to compress/decompress") + .build(); private List<PropertyDescriptor> properties; private Set<Relationship> relationships; @@ -135,7 +146,8 @@ public class CompressContent extends AbstractProcessor { mimeTypeMap.put("application/gzip", COMPRESSION_FORMAT_GZIP); mimeTypeMap.put("application/bzip2", COMPRESSION_FORMAT_BZIP2); mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA); - this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap); + this.compressionFormatMimeTypeMap = Collections. + unmodifiableMap(mimeTypeMap); } @Override @@ -274,7 +286,8 @@ public class CompressContent extends AbstractProcessor { final long sizeAfterCompression = flowFile.getSize(); if (MODE_DECOMPRESS.equalsIgnoreCase(compressionMode)) { - flowFile = session.removeAttribute(flowFile, CoreAttributes.MIME_TYPE.key()); + flowFile = session. + removeAttribute(flowFile, CoreAttributes.MIME_TYPE.key()); if (context.getProperty(UPDATE_FILENAME).asBoolean()) { final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); @@ -283,7 +296,8 @@ public class CompressContent extends AbstractProcessor { } } } else { - flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get()); + flowFile = session. + putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get()); if (context.getProperty(UPDATE_FILENAME).asBoolean()) { final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); @@ -291,8 +305,8 @@ public class CompressContent extends AbstractProcessor { } } - logger.info("Successfully {}ed {} using {} compression format; size changed from {} to {} bytes", new Object[]{ - compressionMode.toLowerCase(), flowFile, compressionFormat, sizeBeforeCompression, sizeAfterCompression}); + logger.info("Successfully {}ed {} using {} compression format; size changed from {} to {} bytes", + new Object[]{compressionMode.toLowerCase(), flowFile, compressionFormat, sizeBeforeCompression, sizeAfterCompression}); session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getDuration(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); } catch (final ProcessException e) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java index 83febb5..10cd45d 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java @@ -71,13 +71,16 @@ public class ControlRate extends AbstractProcessor { .build(); public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder() .name("Maximum Rate") - .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.") + .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a " + + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria .build(); public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() .name("Rate Controlled Attribute") - .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. The value of the attribute referenced by this property must be a positive integer, or the FlowFile will be routed to failure. This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.") + .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. " + + "The value of the attribute referenced by this property must be a positive integer, or the FlowFile will be routed to failure. " + + "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(false) @@ -91,14 +94,21 @@ public class ControlRate extends AbstractProcessor { .build(); public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() .name("Grouping Attribute") - .description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for each value specified by the attribute with this name. Changing this value resets the rate counters.") + .description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for " + + "each value specified by the attribute with this name. Changing this value resets the rate counters.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(false) .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles are transferred to this relationship").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles are transferred to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format") + .build(); private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*"); private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###"; @@ -146,7 +156,10 @@ public class ControlRate extends AbstractProcessor { rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR; final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); if (rateAttr == null) { - validationResults.add(new ValidationResult.Builder().subject(RATE_CONTROL_ATTRIBUTE_NAME.getName()).explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'").build()); + validationResults.add(new ValidationResult.Builder(). + subject(RATE_CONTROL_ATTRIBUTE_NAME.getName()). + explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'"). + build()); } break; case FLOWFILE_RATE: @@ -167,7 +180,10 @@ public class ControlRate extends AbstractProcessor { public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { super.onPropertyModified(descriptor, oldValue, newValue); - if (descriptor.equals(RATE_CONTROL_CRITERIA) || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME) || descriptor.equals(GROUPING_ATTRIBUTE_NAME) || descriptor.equals(TIME_PERIOD)) { + if (descriptor.equals(RATE_CONTROL_CRITERIA) + || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME) + || descriptor.equals(GROUPING_ATTRIBUTE_NAME) + || descriptor.equals(TIME_PERIOD)) { // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map. throttleMap.clear(); } else if (descriptor.equals(MAX_RATE)) { @@ -227,8 +243,7 @@ public class ControlRate extends AbstractProcessor { case ATTRIBUTE_RATE: final String attributeValue = flowFile.getAttribute(rateControlAttributeName); if (attributeValue == null) { - logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", - new Object[]{flowFile, rateControlAttributeName}); + logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[]{flowFile, rateControlAttributeName}); session.transfer(flowFile, REL_FAILURE); return; } @@ -266,7 +281,8 @@ public class ControlRate extends AbstractProcessor { throttle.lock(); try { if (throttle.tryAdd(rateValue)) { - logger.info("transferring {} to 'success'", new Object[]{flowFile}); + logger. + info("transferring {} to 'success'", new Object[]{flowFile}); session.transfer(flowFile, REL_SUCCESS); } else { flowFile = session.penalize(flowFile); @@ -361,7 +377,8 @@ public class ControlRate extends AbstractProcessor { return false; } - logger.debug("current sum for throttle is {}, so allowing rate of {} through", new Object[]{sum == null ? 0 : sum.getValue(), value}); + logger.debug("current sum for throttle is {}, so allowing rate of {} through", + new Object[]{sum == null ? 0 : sum.getValue(), value}); final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue(); if (transferred > maxRateValue) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java index 119a3f2..c8d22d3 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java @@ -45,33 +45,24 @@ import java.util.concurrent.TimeUnit; /** * <p> - * This processor reads files in as text according to the specified character - * set and it outputs another text file according to the given characeter set. - * The character sets supported depend on the version of the JRE and is platform - * specific. In addition, the JVM can be expanded with additional character sets - * to support. More information on which character sets are supported can be - * found in the JDK documentation under the docs directory in the following - * path: ....\technotes\guides\intl\encoding.doc.html</p> + * This processor reads files in as text according to the specified character set and it outputs another text file according to the given characeter set. The character sets supported depend on the + * version of the JRE and is platform specific. In addition, the JVM can be expanded with additional character sets to support. More information on which character sets are supported can be found in + * the JDK documentation under the docs directory in the following path: ....\technotes\guides\intl\encoding.doc.html</p> * * <p> - * The conversion process is very passive. For conversions that do not map - * perfectly the conversion will replace unmappable or unrecognized input using - * the '?' character. + * The conversion process is very passive. For conversions that do not map perfectly the conversion will replace unmappable or unrecognized input using the '?' character. * * <p> - * The following properties are required: <ul> <li><b>input.charset</b> - The - * character set of the original file contents</li> <li><b>output.charset</b> - - * The character set of the resulting file</li> </ul> </p> + * The following properties are required: <ul> <li><b>input.charset</b> - The character set of the original file contents</li> <li><b>output.charset</b> - The character set of the resulting file</li> + * </ul> </p> * * <p> * The following properties are optional: <ul> <li><b>N/A</b> - </li> </ul> * </p> * * <p> - * The following relationships are required: <ul> <li><b>success</b> - the id of - * the processor to transfer successfully converted files</li> - * <li><b>failure</b> - the id of the processor to transfer unsuccessfully - * converted files</li> </ul> </p> + * The following relationships are required: <ul> <li><b>success</b> - the id of the processor to transfer successfully converted files</li> + * <li><b>failure</b> - the id of the processor to transfer unsuccessfully converted files</li> </ul> </p> */ @EventDriven @SideEffectFree @@ -126,8 +117,12 @@ public class ConvertCharacterSet extends AbstractProcessor { public void onTrigger(final ProcessContext context, final ProcessSession session) { final ProcessorLog logger = getLogger(); - final Charset inputCharset = Charset.forName(context.getProperty(INPUT_CHARSET).getValue()); - final Charset outputCharset = Charset.forName(context.getProperty(OUTPUT_CHARSET).getValue()); + final Charset inputCharset = Charset.forName(context. + getProperty(INPUT_CHARSET). + getValue()); + final Charset outputCharset = Charset.forName(context. + getProperty(OUTPUT_CHARSET). + getValue()); final CharBuffer charBuffer = CharBuffer.allocate(MAX_BUFFER_SIZE); final CharsetDecoder decoder = inputCharset.newDecoder(); @@ -164,8 +159,8 @@ public class ConvertCharacterSet extends AbstractProcessor { }); session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - logger.info("successfully converted characters from {} to {} for {}", new Object[]{ - context.getProperty(INPUT_CHARSET).getValue(), context.getProperty(OUTPUT_CHARSET).getValue(), flowFile}); + logger.info("successfully converted characters from {} to {} for {}", + new Object[]{context.getProperty(INPUT_CHARSET).getValue(), context.getProperty(OUTPUT_CHARSET).getValue(), flowFile}); session.transfer(flowFile, REL_SUCCESS); } catch (final Exception e) { throw new ProcessException(e); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java index cd0a21b..5e1fffa 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java @@ -56,8 +56,10 @@ import org.apache.nifi.processor.util.StandardValidators; + "If so, routes the FlowFile to 'duplicate' with an attribute named 'original.identifier' that specifies the original FlowFile's" + "\"description\", which is specified in the <FlowFile Description> property. If the FlowFile is not determined to be a duplicate, the Processor " + "routes the FlowFile to 'non-duplicate'") -@WritesAttribute(attribute="original.flowfile.description", description="All FlowFiles routed to the duplicate relationship will have an attribute added named original.flowfile.description. The value of this attribute is determined by the attributes of the original copy of the data and by the FlowFile Description property.") -@SeeAlso(classNames={"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer"}) +@WritesAttribute(attribute = "original.flowfile.description", description = "All FlowFiles routed to the duplicate relationship will have " + + "an attribute added named original.flowfile.description. The value of this attribute is determined by the attributes of the original " + + "copy of the data and by the FlowFile Description property.") +@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer"}) public class DetectDuplicate extends AbstractProcessor { public static final String ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME = "original.flowfile.description"; @@ -80,8 +82,8 @@ public class DetectDuplicate extends AbstractProcessor { .build(); public static final PropertyDescriptor FLOWFILE_DESCRIPTION = new PropertyDescriptor.Builder() .name("FlowFile Description") - .description( - "When a FlowFile is added to the cache, this value is stored along with it so that if a duplicate is found, this description of the original FlowFile will be added to the duplicate's \"" + .description("When a FlowFile is added to the cache, this value is stored along with it so that if a duplicate is found, this " + + "description of the original FlowFile will be added to the duplicate's \"" + ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME + "\" attribute") .required(true) .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) @@ -96,12 +98,18 @@ public class DetectDuplicate extends AbstractProcessor { .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); - public static final Relationship REL_DUPLICATE = new Relationship.Builder().name("duplicate") - .description("If a FlowFile has been detected to be a duplicate, it will be routed to this relationship").build(); - public static final Relationship REL_NON_DUPLICATE = new Relationship.Builder().name("non-duplicate") - .description("If a FlowFile's Cache Entry Identifier was not found in the cache, it will be routed to this relationship").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") - .description("If unable to communicate with the cache, the FlowFile will be penalized and routed to this relationship").build(); + public static final Relationship REL_DUPLICATE = new Relationship.Builder() + .name("duplicate") + .description("If a FlowFile has been detected to be a duplicate, it will be routed to this relationship") + .build(); + public static final Relationship REL_NON_DUPLICATE = new Relationship.Builder() + .name("non-duplicate") + .description("If a FlowFile's Cache Entry Identifier was not found in the cache, it will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If unable to communicate with the cache, the FlowFile will be penalized and routed to this relationship") + .build(); private final Set<Relationship> relationships; private final Serializer<String> keySerializer = new StringSerializer(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java index 920041b..585ba1d 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java @@ -63,9 +63,11 @@ import org.apache.nifi.processor.util.StandardValidators; + "strategy, the default is to assign each destination a weighting of 1 (evenly distributed). However, optional properties" + "can be added to the change this; adding a property with the name '5' and value '10' means that the relationship with name " + "'5' will be receive 10 FlowFiles in each iteration instead of 1.") -@DynamicProperty(name="The relationship name(positive number)", value="The relationship Weight(positive number)", description="adding a property with the name '5' and value '10' means that the relationship with name " +@DynamicProperty(name = "The relationship name(positive number)", value = "The relationship Weight(positive number)", description = "adding a " + + "property with the name '5' and value '10' means that the relationship with name " + "'5' will be receive 10 FlowFiles in each iteration instead of 1.") -@DynamicRelationship(name="A number 1..<Number Of Relationships>", description="FlowFiles are sent to this relationship per the <Distribution Strategy>") +@DynamicRelationship(name = "A number 1..<Number Of Relationships>", description = "FlowFiles are sent to this relationship per the " + + "<Distribution Strategy>") public class DistributeLoad extends AbstractProcessor { public static final String STRATEGY_ROUND_ROBIN = "round robin"; @@ -81,8 +83,9 @@ public class DistributeLoad extends AbstractProcessor { .build(); public static final PropertyDescriptor DISTRIBUTION_STRATEGY = new PropertyDescriptor.Builder() .name("Distribution Strategy") - .description( - "Determines how the load will be distributed. If using Round Robin, will not distribute any FlowFiles unless all destinations can accept FlowFiles; when using Next Available, will distribute FlowFiles as long as at least 1 destination can accept FlowFiles.") + .description("Determines how the load will be distributed. If using Round Robin, will not distribute any FlowFiles unless all " + + "destinations can accept FlowFiles; when using Next Available, will distribute FlowFiles as long as at least 1 " + + "destination can accept FlowFiles.") .required(true) .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE) .defaultValue(STRATEGY_ROUND_ROBIN) @@ -96,43 +99,29 @@ public class DistributeLoad extends AbstractProcessor { @Override public ValidationResult validate(String subject, String input, ValidationContext context) { - ValidationResult result = new ValidationResult.Builder() - .subject(subject) - .valid(true) - .input(input) - .explanation("Good FQDNs") - .build(); + ValidationResult result = new ValidationResult.Builder().subject(subject).valid(true).input(input).explanation("Good FQDNs").build(); if (null == input) { - result = new ValidationResult.Builder() - .subject(subject) - .input(input) - .valid(false) - .explanation("Need to specify delimited list of FQDNs") - .build(); + result = new ValidationResult.Builder().subject(subject).input(input).valid(false) + .explanation("Need to specify delimited list of FQDNs").build(); return result; } String[] hostNames = input.split("(?:,+|;+|\\s+)"); for (String hostName : hostNames) { if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) { - result = new ValidationResult.Builder() - .subject(subject) - .input(input) - .valid(false) - .explanation("Need a FQDN rather than a simple host name.") - .build(); + result = new ValidationResult.Builder().subject(subject).input(input).valid(false) + .explanation("Need a FQDN rather than a simple host name.").build(); return result; } } return result; } - }) - .build(); + }).build(); public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE = new PropertyDescriptor.Builder() - .name("Load Distribution Service ID") - .description("The identifier of the Load Distribution Service") - .required(true) - .identifiesControllerService(LoadDistributionService.class) - .build(); + .name("Load Distribution Service ID"). + description("The identifier of the Load Distribution Service"). + required(true). + identifiesControllerService(LoadDistributionService.class). + build(); private List<PropertyDescriptor> properties; private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>(); @@ -155,7 +144,8 @@ public class DistributeLoad extends AbstractProcessor { } private static Relationship createRelationship(final int num) { - return new Relationship.Builder().name(String.valueOf(num)).description("Where to route flowfiles for this relationship index").build(); + return new Relationship.Builder().name(String.valueOf(num)) + .description("Where to route flowfiles for this relationship index").build(); } @Override @@ -210,12 +200,12 @@ public class DistributeLoad extends AbstractProcessor { try { final int value = Integer.parseInt(propertyDescriptorName); if (value <= 0 || value > numRelationships) { - return new PropertyDescriptor.Builder().addValidator(new InvalidPropertyNameValidator(propertyDescriptorName)) - .name(propertyDescriptorName).build(); + return new PropertyDescriptor.Builder() + .addValidator(new InvalidPropertyNameValidator(propertyDescriptorName)).name(propertyDescriptorName).build(); } } catch (final NumberFormatException e) { - return new PropertyDescriptor.Builder().addValidator(new InvalidPropertyNameValidator(propertyDescriptorName)) - .name(propertyDescriptorName).build(); + return new PropertyDescriptor.Builder() + .addValidator(new InvalidPropertyNameValidator(propertyDescriptorName)).name(propertyDescriptorName).build(); } // validate that the property value is valid @@ -232,19 +222,15 @@ public class DistributeLoad extends AbstractProcessor { // make sure Hostnames and Controller service are set PropertyValue propDesc = validationContext.getProperty(HOSTNAMES); if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) { - results.add(new ValidationResult.Builder() - .subject(HOSTNAMES.getName()) - .explanation("Must specify Hostnames when using 'Load Distribution Strategy'") - .valid(false) - .build()); + results.add(new ValidationResult.Builder().subject(HOSTNAMES.getName()) + .explanation("Must specify Hostnames when using 'Load Distribution Strategy'").valid(false).build()); } propDesc = validationContext.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE); if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) { results.add(new ValidationResult.Builder() .subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName()) .explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Strategy'") - .valid(false) - .build()); + .valid(false).build()); } if (results.isEmpty()) { int numRels = validationContext.getProperty(NUM_RELATIONSHIPS).asInteger(); @@ -260,13 +246,13 @@ public class DistributeLoad extends AbstractProcessor { results.add(new ValidationResult.Builder() .subject("Number of Relationships and Hostnames") .explanation("Number of Relationships must be equal to, or greater than, the number of host names") - .valid(false) - .build()); + .valid(false).build()); } else { // create new relationships with descriptions of hostname Set<Relationship> relsWithDesc = new TreeSet<>(); for (int i = 0; i < numHosts; i++) { - relsWithDesc.add(new Relationship.Builder().name(String.valueOf(i + 1)).description(hostNames[i]).build()); + relsWithDesc.add(new Relationship.Builder().name(String.valueOf(i + 1)) + .description(hostNames[i]).build()); } // add add'l rels if configuration requires it...it probably shouldn't for (int i = numHosts + 1; i <= numRels; i++) { @@ -341,7 +327,8 @@ public class DistributeLoad extends AbstractProcessor { final List<Relationship> relationshipList = new ArrayList<>(); for (final Map.Entry<Integer, Integer> entry : weightings.entrySet()) { final String relationshipName = String.valueOf(entry.getKey()); - final Relationship relationship = new Relationship.Builder().name(relationshipName).build(); + final Relationship relationship = new Relationship.Builder(). + name(relationshipName).build(); for (int i = 0; i < entry.getValue(); i++) { relationshipList.add(relationship); } @@ -389,8 +376,7 @@ public class DistributeLoad extends AbstractProcessor { public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) { return new ValidationResult.Builder().subject("Property Name").input(propertyName) .explanation("Property Name must be a positive integer between 1 and the number of relationships (inclusive)") - .valid(false) - .build(); + .valid(false).build(); } } @@ -400,12 +386,9 @@ public class DistributeLoad extends AbstractProcessor { private static interface DistributionStrategy { /** - * Returns a mapping of FlowFile to Relationship or <code>null</code> if - * the needed relationships are not available to accept files. - * - * @param session - * @param flowFiles - * @return + * @param session session + * @param flowFiles flowFile + * @return a mapping of FlowFile to Relationship or <code>null</code> if the needed relationships are not available to accept files */ Relationship mapToRelationship(ProcessContext context, FlowFile flowFile); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java index 2465b56..3e26857 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java @@ -65,7 +65,7 @@ public class EncodeContent extends AbstractProcessor { // List of support encodings. public static final String BASE64_ENCODING = "base64"; public static final String BASE32_ENCODING = "base32"; - public static final String HEX_ENCODING = "hex"; + public static final String HEX_ENCODING = "hex"; public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() .name("Mode") @@ -83,8 +83,14 @@ public class EncodeContent extends AbstractProcessor { .defaultValue(BASE64_ENCODING) .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully encoded or decoded will be routed to success").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that cannot be encoded or decoded will be routed to failure").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully encoded or decoded will be routed to success") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be encoded or decoded will be routed to failure") + .build(); private List<PropertyDescriptor> properties; private Set<Relationship> relationships; @@ -163,6 +169,7 @@ public class EncodeContent extends AbstractProcessor { } private class EncodeBase64 implements StreamCallback { + @Override public void process(InputStream in, OutputStream out) throws IOException { try (Base64OutputStream bos = new Base64OutputStream(out)) { @@ -172,6 +179,7 @@ public class EncodeContent extends AbstractProcessor { } private class DecodeBase64 implements StreamCallback { + @Override public void process(InputStream in, OutputStream out) throws IOException { try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) { @@ -181,6 +189,7 @@ public class EncodeContent extends AbstractProcessor { } private class EncodeBase32 implements StreamCallback { + @Override public void process(InputStream in, OutputStream out) throws IOException { try (Base32OutputStream bos = new Base32OutputStream(out)) { @@ -190,6 +199,7 @@ public class EncodeContent extends AbstractProcessor { } private class DecodeBase32 implements StreamCallback { + @Override public void process(InputStream in, OutputStream out) throws IOException { try (Base32InputStream bis = new Base32InputStream(new ValidatingBase32InputStream(in))) { @@ -198,10 +208,10 @@ public class EncodeContent extends AbstractProcessor { } } - private static final byte[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', - '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; + private static final byte[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; private class EncodeHex implements StreamCallback { + @Override public void process(InputStream in, OutputStream out) throws IOException { int len; @@ -209,16 +219,17 @@ public class EncodeContent extends AbstractProcessor { byte[] outBuf = new byte[inBuf.length * 2]; while ((len = in.read(inBuf)) > 0) { for (int i = 0; i < len; i++) { - outBuf[i*2] = HEX_CHARS[(inBuf[i] & 0xF0) >>> 4]; - outBuf[i*2 +1] = HEX_CHARS[inBuf[i] & 0x0F]; + outBuf[i * 2] = HEX_CHARS[(inBuf[i] & 0xF0) >>> 4]; + outBuf[i * 2 + 1] = HEX_CHARS[inBuf[i] & 0x0F]; } - out.write(outBuf, 0, len*2); + out.write(outBuf, 0, len * 2); } out.flush(); } } private class DecodeHex implements StreamCallback { + @Override public void process(InputStream in, OutputStream out) throws IOException { int len; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java index 69cb18e..c0f6301 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java @@ -38,7 +38,6 @@ import org.apache.nifi.util.StopWatch; import org.bouncycastle.jce.provider.BouncyCastleProvider; -import javax.crypto.*; import javax.crypto.spec.PBEKeySpec; import javax.crypto.spec.PBEParameterSpec; @@ -50,8 +49,17 @@ import java.security.InvalidKeyException; import java.security.SecureRandom; import java.security.Security; import java.text.Normalizer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; +import javax.crypto.BadPaddingException; +import javax.crypto.Cipher; +import javax.crypto.IllegalBlockSizeException; +import javax.crypto.SecretKey; +import javax.crypto.SecretKeyFactory; @EventDriven @SideEffectFree @@ -87,14 +95,19 @@ public class EncryptContent extends AbstractProcessor { .sensitive(true) .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully encrypted or decrypted will be routed to success").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that cannot be encrypted or decrypted will be routed to failure").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully encrypted or decrypted will be routed to success") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be encrypted or decrypted will be routed to failure") + .build(); private List<PropertyDescriptor> properties; private Set<Relationship> relationships; static { - // add BouncyCastle encryption providers Security.addProvider(new BouncyCastleProvider()); }