NIFI-271 checkpoint
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b612b6bc Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b612b6bc Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b612b6bc Branch: refs/heads/NIFI-271 Commit: b612b6bcd9f2f29a4466360d5fbbeccee62ae650 Parents: afb4fe5 Author: joewitt <joew...@apache.org> Authored: Tue Apr 21 23:39:31 2015 -0400 Committer: joewitt <joew...@apache.org> Committed: Tue Apr 21 23:39:31 2015 -0400 ---------------------------------------------------------------------- .../nifi/processor/util/FlowFileFilters.java | 2 +- .../nifi/processor/util/SSLProperties.java | 8 +- .../nifi/processor/util/StandardValidators.java | 140 +++- .../processor/util/TestStandardValidators.java | 18 +- .../nifi-site-to-site-client/pom.xml | 64 +- .../remote/AbstractCommunicationsSession.java | 9 +- .../org/apache/nifi/remote/Communicant.java | 23 +- .../main/java/org/apache/nifi/remote/Peer.java | 20 +- .../org/apache/nifi/remote/PeerDescription.java | 7 +- .../java/org/apache/nifi/remote/PeerStatus.java | 7 +- .../nifi/remote/RemoteResourceInitiator.java | 48 +- .../org/apache/nifi/remote/Transaction.java | 295 +++---- .../nifi/remote/TransactionCompletion.java | 49 +- .../apache/nifi/remote/TransferDirection.java | 13 +- .../nifi/remote/client/SiteToSiteClient.java | 831 ++++++++++--------- .../remote/client/SiteToSiteClientConfig.java | 170 ++-- .../client/socket/EndpointConnection.java | 15 +- .../client/socket/EndpointConnectionPool.java | 627 +++++++------- .../nifi/remote/client/socket/SocketClient.java | 319 ++++--- .../apache/nifi/remote/codec/FlowFileCodec.java | 15 +- .../remote/codec/StandardFlowFileCodec.java | 33 +- .../remote/exception/HandshakeException.java | 9 +- .../exception/PortNotRunningException.java | 7 +- .../remote/exception/ProtocolException.java | 4 +- .../remote/exception/UnknownPortException.java | 6 +- .../SocketChannelCommunicationsSession.java | 23 +- .../remote/io/socket/SocketChannelInput.java | 11 +- .../remote/io/socket/SocketChannelOutput.java | 11 +- .../SSLSocketChannelCommunicationsSession.java | 23 +- .../io/socket/ssl/SSLSocketChannelInput.java | 9 +- .../io/socket/ssl/SSLSocketChannelOutput.java | 5 +- .../nifi/remote/protocol/ClientProtocol.java | 18 +- .../remote/protocol/CommunicationsInput.java | 5 +- .../remote/protocol/CommunicationsSession.java | 6 +- .../apache/nifi/remote/protocol/DataPacket.java | 33 +- .../protocol/socket/HandshakeProperty.java | 38 +- .../nifi/remote/protocol/socket/Response.java | 11 +- .../remote/protocol/socket/ResponseCode.java | 65 +- .../protocol/socket/SocketClientProtocol.java | 388 ++++----- .../socket/SocketClientTransaction.java | 468 +++++------ .../SocketClientTransactionCompletion.java | 2 +- .../nifi/remote/util/NiFiRestApiUtil.java | 24 +- .../nifi/remote/util/PeerStatusCache.java | 3 +- .../nifi/remote/util/StandardDataPacket.java | 44 +- .../socket/TestEndpointConnectionStatePool.java | 17 +- .../client/socket/TestSiteToSiteClient.java | 42 +- .../nifi/io/nio/AbstractChannelReader.java | 10 +- .../java/org/apache/nifi/io/nio/BufferPool.java | 10 +- .../apache/nifi/io/nio/ChannelDispatcher.java | 14 +- .../org/apache/nifi/io/nio/ChannelListener.java | 7 +- .../nifi/io/nio/DatagramChannelReader.java | 12 +- .../apache/nifi/io/nio/SocketChannelReader.java | 12 +- .../nifi/io/nio/consumer/StreamConsumer.java | 12 +- .../nifi/io/socket/SSLContextFactory.java | 12 +- .../io/socket/ServerSocketConfiguration.java | 6 +- .../nifi/io/socket/SocketConfiguration.java | 6 +- .../org/apache/nifi/io/socket/SocketUtils.java | 6 +- .../io/socket/multicast/MulticastListener.java | 5 +- .../apache/nifi/io/nio/example/TCPClient.java | 3 +- .../org/wali/MinimalLockingWriteAheadLog.java | 70 +- .../src/main/java/org/wali/SerDe.java | 40 +- .../java/org/wali/WriteAheadRepository.java | 16 +- ...kControllerServiceInitializationContext.java | 8 +- .../nifi/util/MockControllerServiceLookup.java | 14 +- .../java/org/apache/nifi/util/MockFlowFile.java | 20 +- .../apache/nifi/util/MockProcessContext.java | 15 +- .../apache/nifi/util/MockProcessSession.java | 33 +- .../MockProcessorInitializationContext.java | 4 +- .../org/apache/nifi/util/MockProcessorLog.java | 124 --- .../nifi/util/MockProvenanceReporter.java | 4 +- .../apache/nifi/util/MockReportingContext.java | 6 +- .../MockReportingInitializationContext.java | 2 +- .../apache/nifi/util/MockValidationContext.java | 20 +- .../org/apache/nifi/util/ReflectionUtils.java | 21 +- .../apache/nifi/util/SharedSessionState.java | 1 - .../nifi/util/StandardProcessorTestRunner.java | 123 ++- .../java/org/apache/nifi/util/TestRunner.java | 428 +++++----- .../util/TestStandardProcessorTestRunner.java | 30 +- .../apache/nifi/documentation/DocGenerator.java | 12 +- .../html/HtmlDocumentationWriter.java | 231 ++---- .../html/HtmlProcessorDocumentationWriter.java | 77 +- .../FullyDocumentedControllerService.java | 51 +- .../example/FullyDocumentedProcessor.java | 156 ++-- .../example/FullyDocumentedReportingTask.java | 32 +- .../documentation/example/NakedProcessor.java | 8 +- .../documentation/example/SampleService.java | 2 +- .../html/HtmlDocumentationWriterTest.java | 94 +-- .../html/ProcessorDocumentationWriterTest.java | 135 ++- .../nifi/documentation/html/XmlValidator.java | 29 +- .../org/apache/nifi/nar/ExtensionManager.java | 14 +- .../org/apache/nifi/nar/NarClassLoader.java | 24 +- .../org/apache/nifi/nar/NarClassLoaders.java | 40 +- .../java/org/apache/nifi/nar/NarUnpacker.java | 14 +- .../java/org/apache/nifi/util/FileUtils.java | 28 +- 94 files changed, 2944 insertions(+), 3082 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java index 1f77093..2d1a407 100644 --- a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java +++ b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java @@ -32,7 +32,7 @@ public class FlowFileFilters { * @param maxSize the maximum size of the group of FlowFiles * @param unit the unit of the <code>maxSize</code> argument * @param maxCount the maximum number of FlowFiles to pull - * @return + * @return filter */ public static FlowFileFilter newSizeBasedFilter(final double maxSize, final DataUnit unit, final int maxCount) { final double maxBytes = DataUnit.B.convert(maxSize, unit); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java index 0d66df5..87d63de 100644 --- a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java +++ b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java @@ -130,7 +130,13 @@ public class SSLProperties { try { final boolean storeValid = CertificateUtils.isStoreValid(file.toURI().toURL(), KeystoreType.valueOf(type), password.toCharArray()); if (!storeValid) { - results.add(new ValidationResult.Builder().subject(keystoreDesc + " Properties").valid(false).explanation("Invalid KeyStore Password or Type specified for file " + filename).build()); + results.add( + new ValidationResult.Builder() + .subject(keystoreDesc + " Properties") + .valid(false) + .explanation("Invalid KeyStore Password or Type specified for file " + filename) + .build() + ); } } catch (MalformedURLException e) { results.add(new ValidationResult.Builder().subject(keystoreDesc + " Properties").valid(false).explanation("Malformed URL from file: " + e).build()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java index d1621ed..37ba7d8 100644 --- a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java +++ b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java @@ -46,10 +46,10 @@ public class StandardValidators { public ValidationResult validate(final String subject, final String input, final ValidationContext context) { final ValidationResult.Builder builder = new ValidationResult.Builder(); builder.subject(subject).input(input); - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { return builder.valid(true).explanation("Contains Expression Language").build(); } - + try { FlowFile.KeyValidator.validateKey(input); builder.valid(true); @@ -66,10 +66,10 @@ public class StandardValidators { public ValidationResult validate(final String subject, final String input, final ValidationContext context) { final ValidationResult.Builder builder = new ValidationResult.Builder(); builder.subject("Property Name").input(subject); - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { return builder.valid(true).explanation("Contains Expression Language").build(); } - + try { FlowFile.KeyValidator.validateKey(subject); builder.valid(true); @@ -84,10 +84,10 @@ public class StandardValidators { public static final Validator POSITIVE_INTEGER_VALIDATOR = new Validator() { @Override public ValidationResult validate(final String subject, final String value, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); } - + String reason = null; try { final int intVal = Integer.parseInt(value); @@ -106,7 +106,7 @@ public class StandardValidators { public static final Validator POSITIVE_LONG_VALIDATOR = new Validator() { @Override public ValidationResult validate(final String subject, final String value, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); } @@ -137,7 +137,7 @@ public class StandardValidators { public static final Validator BOOLEAN_VALIDATOR = new Validator() { @Override public ValidationResult validate(final String subject, final String value, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); } @@ -150,7 +150,7 @@ public class StandardValidators { public static final Validator INTEGER_VALIDATOR = new Validator() { @Override public ValidationResult validate(final String subject, final String value, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); } @@ -168,7 +168,7 @@ public class StandardValidators { public static final Validator LONG_VALIDATOR = new Validator() { @Override public ValidationResult validate(final String subject, final String value, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); } @@ -186,7 +186,7 @@ public class StandardValidators { public static final Validator NON_NEGATIVE_INTEGER_VALIDATOR = new Validator() { @Override public ValidationResult validate(final String subject, final String value, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); } @@ -208,7 +208,7 @@ public class StandardValidators { public static final Validator CHARACTER_SET_VALIDATOR = new Validator() { @Override public ValidationResult validate(final String subject, final String value, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); } @@ -235,7 +235,7 @@ public class StandardValidators { public static final Validator URI_VALIDATOR = new Validator() { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); } @@ -253,7 +253,7 @@ public class StandardValidators { public static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = new Validator() { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); } @@ -270,7 +270,7 @@ public class StandardValidators { public static final Validator TIME_PERIOD_VALIDATOR = new Validator() { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); } @@ -280,7 +280,14 @@ public class StandardValidators { if (Pattern.compile(FormatUtils.TIME_DURATION_REGEX).matcher(input.toLowerCase()).matches()) { return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); } else { - return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Must be of format <duration> <TimeUnit> where <duration> is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days").build(); + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Must be of format <duration> <TimeUnit> where <duration> is a " + + "non-negative integer and TimeUnit is a supported Time Unit, such " + + "as: nanos, millis, secs, mins, hrs, days") + .build(); } } }; @@ -288,17 +295,28 @@ public class StandardValidators { public static final Validator DATA_SIZE_VALIDATOR = new Validator() { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); } if (input == null) { - return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Data Size cannot be null").build(); + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Data Size cannot be null") + .build(); } if (Pattern.compile(DataUnit.DATA_SIZE_REGEX).matcher(input.toUpperCase()).matches()) { return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); } else { - return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Must be of format <Data Size> <Data Unit> where <Data Size> is a non-negative integer and <Data Unit> is a supported Data Unit, such as: B, KB, MB, GB, TB").build(); + return new ValidationResult.Builder() + .subject(subject).input(input) + .valid(false) + .explanation("Must be of format <Data Size> <Data Unit> where <Data Size>" + + " is a non-negative integer and <Data Unit> is a supported Data" + + " Unit, such as: B, KB, MB, GB, TB") + .build(); } } }; @@ -318,7 +336,7 @@ public class StandardValidators { return new Validator() { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); } @@ -346,19 +364,19 @@ public class StandardValidators { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); } final ValidationResult vr = DATA_SIZE_VALIDATOR.validate(subject, input, context); - if(!vr.isValid()){ + if (!vr.isValid()) { return vr; } final long dataSizeBytes = DataUnit.parseDataSize(input, DataUnit.B).longValue(); - if(dataSizeBytes < minBytesInclusive){ + if (dataSizeBytes < minBytesInclusive) { return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be smaller than " + minBytesInclusive + " bytes").build(); } - if(dataSizeBytes > maxBytesInclusive){ + if (dataSizeBytes > maxBytesInclusive) { return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be larger than " + maxBytesInclusive + " bytes").build(); } return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); @@ -371,7 +389,7 @@ public class StandardValidators { return new Validator() { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); } @@ -397,10 +415,11 @@ public class StandardValidators { * Language will not support FlowFile Attributes but only System/JVM * Properties * - * @param minCapturingGroups - * @param maxCapturingGroups - * @param supportAttributeExpressionLanguage - * @return + * @param minCapturingGroups minimum capturing groups allowed + * @param maxCapturingGroups maximum capturing groups allowed + * @param supportAttributeExpressionLanguage whether or not to support + * expression language + * @return validator */ public static Validator createRegexValidator(final int minCapturingGroups, final int maxCapturingGroups, final boolean supportAttributeExpressionLanguage) { return new Validator() { @@ -412,7 +431,12 @@ public class StandardValidators { try { substituted = context.newPropertyValue(value).evaluateAttributeExpressions().getValue(); } catch (final Exception e) { - return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation("Failed to evaluate the Attribute Expression Language due to " + e.toString()).build(); + return new ValidationResult.Builder() + .subject(subject) + .input(value) + .valid(false) + .explanation("Failed to evaluate the Attribute Expression Language due to " + e.toString()) + .build(); } } else { substituted = value; @@ -421,12 +445,22 @@ public class StandardValidators { final Pattern pattern = Pattern.compile(substituted); final int numGroups = pattern.matcher("").groupCount(); if (numGroups < minCapturingGroups || numGroups > maxCapturingGroups) { - return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation("RegEx is required to have between " + minCapturingGroups + " and " + maxCapturingGroups + " Capturing Groups but has " + numGroups).build(); + return new ValidationResult.Builder() + .subject(subject) + .input(value) + .valid(false) + .explanation("RegEx is required to have between " + minCapturingGroups + " and " + maxCapturingGroups + " Capturing Groups but has " + numGroups) + .build(); } return new ValidationResult.Builder().subject(subject).input(value).valid(true).build(); } catch (final Exception e) { - return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation("Not a valid Java Regular Expression").build(); + return new ValidationResult.Builder() + .subject(subject) + .input(value) + .valid(false) + .explanation("Not a valid Java Regular Expression") + .build(); } } @@ -444,7 +478,12 @@ public class StandardValidators { final ResultType resultType = allowExtraCharacters ? ResultType.STRING : context.newExpressionLanguageCompiler().getResultType(input); if (!resultType.equals(expectedResultType)) { - return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Expected Attribute Query to return type " + expectedResultType + " but query returns type " + resultType).build(); + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Expected Attribute Query to return type " + expectedResultType + " but query returns type " + resultType) + .build(); } return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); @@ -456,7 +495,7 @@ public class StandardValidators { return new Validator() { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); } @@ -502,7 +541,7 @@ public class StandardValidators { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); } @@ -523,7 +562,8 @@ public class StandardValidators { } } else { builder.subject(subject).input(input).valid(false) - .explanation("Must be of format <duration> <TimeUnit> where <duration> is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days"); + .explanation("Must be of format <duration> <TimeUnit> where <duration> is a non-negative " + + "integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days"); } return builder.build(); } @@ -539,7 +579,7 @@ public class StandardValidators { @Override public ValidationResult validate(final String subject, final String value, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); } @@ -574,7 +614,7 @@ public class StandardValidators { @Override public ValidationResult validate(final String subject, final String value, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); } @@ -617,17 +657,19 @@ public class StandardValidators { /** * Creates a validator based on existence of a {@link ControllerService}. - * - * @param serviceClass the controller service API your {@link ConfigurableComponent} depends on + * + * @param serviceClass the controller service API your + * {@link ConfigurableComponent} depends on * @return a Validator - * @deprecated As of release 0.1.0-incubating, replaced by {@link org.apache.nifi.components.PropertyDescriptor.Builder#identifiesControllerService(Class)} + * @deprecated As of release 0.1.0-incubating, replaced by + * {@link org.apache.nifi.components.PropertyDescriptor.Builder#identifiesControllerService(Class)} */ @Deprecated public static Validator createControllerServiceExistsValidator(final Class<? extends ControllerService> serviceClass) { return new Validator() { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); } @@ -638,14 +680,24 @@ public class StandardValidators { } if (!serviceClass.isAssignableFrom(svc.getClass())) { - return new ValidationResult.Builder().valid(false).input(input).subject(subject).explanation("Controller Service with this ID is of type " + svc.getClass().getName() + " but is expected to be of type " + serviceClass.getName()).build(); + return new ValidationResult.Builder() + .valid(false) + .input(input) + .subject(subject) + .explanation("Controller Service with this ID is of type " + svc.getClass().getName() + " but is expected to be of type " + serviceClass.getName()) + .build(); } final ValidationContext serviceValidationContext = context.getControllerServiceValidationContext(svc); final Collection<ValidationResult> serviceValidationResults = svc.validate(serviceValidationContext); for (final ValidationResult result : serviceValidationResults) { if (!result.isValid()) { - return new ValidationResult.Builder().valid(false).input(input).subject(subject).explanation("Controller Service " + input + " is not valid: " + result.getExplanation()).build(); + return new ValidationResult.Builder() + .valid(false) + .input(input) + .subject(subject) + .explanation("Controller Service " + input + " is not valid: " + result.getExplanation()) + .build(); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java b/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java index a8f4bae..bcd402d 100644 --- a/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java +++ b/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java @@ -35,7 +35,7 @@ public class TestStandardValidators { ValidationResult vr; final ValidationContext validationContext = Mockito.mock(ValidationContext.class); - + vr = val.validate("TimePeriodTest", "0 sense made", validationContext); assertFalse(vr.isValid()); @@ -54,22 +54,22 @@ public class TestStandardValidators { vr = val.validate("TimePeriodTest", "1 sec", validationContext); assertTrue(vr.isValid()); } - + @Test public void testDataSizeBoundsValidator() { Validator val = StandardValidators.createDataSizeBoundsValidator(100, 1000); - ValidationResult vr; - + ValidationResult vr; + final ValidationContext validationContext = Mockito.mock(ValidationContext.class); vr = val.validate("DataSizeBounds", "5 GB", validationContext); assertFalse(vr.isValid()); - + vr = val.validate("DataSizeBounds", "0 B", validationContext); assertFalse(vr.isValid()); vr = val.validate("DataSizeBounds", "99 B", validationContext); assertFalse(vr.isValid()); - + vr = val.validate("DataSizeBounds", "100 B", validationContext); assertTrue(vr.isValid()); @@ -78,12 +78,12 @@ public class TestStandardValidators { vr = val.validate("DataSizeBounds", "1000 B", validationContext); assertTrue(vr.isValid()); - + vr = val.validate("DataSizeBounds", "1001 B", validationContext); assertFalse(vr.isValid()); - + vr = val.validate("DataSizeBounds", "water", validationContext); assertFalse(vr.isValid()); - + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml index 5d3d93e..c024be6 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/pom.xml +++ b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml @@ -14,41 +14,41 @@ limitations under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> + <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-commons</artifactId> - <version>0.1.0-incubating-SNAPSHOT</version> - </parent> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-commons</artifactId> + <version>0.1.0-incubating-SNAPSHOT</version> + </parent> - <artifactId>nifi-site-to-site-client</artifactId> + <artifactId>nifi-site-to-site-client</artifactId> - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-utils</artifactId> - </dependency> - <dependency> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-mapper-asl</artifactId> - <version>1.9.13</version> - </dependency> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.9.13</version> + </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-client-dto</artifactId> - <version>0.1.0-incubating-SNAPSHOT</version> - </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-client-dto</artifactId> + <version>0.1.0-incubating-SNAPSHOT</version> + </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - </dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java index 4babb92..dacfd64 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java @@ -19,14 +19,15 @@ package org.apache.nifi.remote; import org.apache.nifi.remote.protocol.CommunicationsSession; public abstract class AbstractCommunicationsSession implements CommunicationsSession { + private String userDn; - + private volatile String uri; - + public AbstractCommunicationsSession(final String uri) { this.uri = uri; } - + @Override public String toString() { return uri; @@ -46,7 +47,7 @@ public abstract class AbstractCommunicationsSession implements CommunicationsSes public String getUserDn() { return userDn; } - + @Override public void setUserDn(final String dn) { this.userDn = dn; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java index ac2d498..17b990e 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java @@ -20,28 +20,27 @@ package org.apache.nifi.remote; * Represents the remote entity that the client is communicating with */ public interface Communicant { + /** - * Returns the NiFi site-to-site URL for the remote NiFi instance - * @return + * @return the NiFi site-to-site URL for the remote NiFi instance */ String getUrl(); - + /** - * The Host of the remote NiFi instance - * @return + * @return The Host of the remote NiFi instance */ String getHost(); - + /** - * The Port that the remote NiFi instance is listening on for site-to-site communications - * @return + * @return The Port that the remote NiFi instance is listening on for + * site-to-site communications */ int getPort(); - + /** - * The distinguished name that the remote NiFi instance has provided in its certificate if - * using secure communications, or <code>null</code> if the Distinguished Name is unknown - * @return + * @return The distinguished name that the remote NiFi instance has provided + * in its certificate if using secure communications, or <code>null</code> + * if the Distinguished Name is unknown */ String getDistinguishedName(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java index 2428078..5cb37b0 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java @@ -31,7 +31,7 @@ public class Peer implements Communicant { private final String clusterUrl; private final String host; private final int port; - + private final Map<String, Long> penaltyExpirationMap = new HashMap<>(); private boolean closed = false; @@ -53,14 +53,14 @@ public class Peer implements Communicant { public PeerDescription getDescription() { return description; } - + @Override public String getUrl() { return url; } - + public String getClusterUrl() { - return clusterUrl; + return clusterUrl; } public CommunicationsSession getCommunicationsSession() { @@ -79,24 +79,24 @@ public class Peer implements Communicant { } /** - * Penalizes this peer for the given destination only for the provided number of milliseconds - * @param destinationId - * @param millis + * Penalizes this peer for the given destination only for the provided + * number of milliseconds + * + * @param destinationId id of destination + * @param millis period of time to penalize peer */ public void penalize(final String destinationId, final long millis) { final Long currentPenalty = penaltyExpirationMap.get(destinationId); final long proposedPenalty = System.currentTimeMillis() + millis; - if ( currentPenalty == null || proposedPenalty > currentPenalty ) { + if (currentPenalty == null || proposedPenalty > currentPenalty) { penaltyExpirationMap.put(destinationId, proposedPenalty); } } - public boolean isPenalized(final String destinationId) { final Long currentPenalty = penaltyExpirationMap.get(destinationId); return (currentPenalty != null && currentPenalty > System.currentTimeMillis()); } - public boolean isClosed() { return closed; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java index 0e8e498..6fc90e4 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java @@ -17,10 +17,11 @@ package org.apache.nifi.remote; public class PeerDescription { + private final String hostname; private final int port; private final boolean secure; - + public PeerDescription(final String hostname, final int port, final boolean secure) { this.hostname = hostname; this.port = port; @@ -64,7 +65,7 @@ public class PeerDescription { if (getClass() != obj.getClass()) { return false; } - + final PeerDescription other = (PeerDescription) obj; if (hostname == null) { if (other.hostname != null) { @@ -73,7 +74,7 @@ public class PeerDescription { } else if (!hostname.equals(other.hostname)) { return false; } - + return port == other.port; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java index b68ac33..6c8a4ec 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java @@ -17,6 +17,7 @@ package org.apache.nifi.remote; public class PeerStatus { + private final PeerDescription description; private final int numFlowFiles; @@ -28,15 +29,15 @@ public class PeerStatus { public PeerDescription getPeerDescription() { return description; } - + public int getFlowFileCount() { return numFlowFiles; } @Override public String toString() { - return "PeerStatus[hostname=" + description.getHostname() + ",port=" + description.getPort() + - ",secure=" + description.isSecure() + ",flowFileCount=" + numFlowFiles + "]"; + return "PeerStatus[hostname=" + description.getHostname() + ",port=" + description.getPort() + + ",secure=" + description.isSecure() + ",flowFileCount=" + numFlowFiles + "]"; } @Override http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java index f469724..582916e 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java @@ -25,49 +25,51 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class RemoteResourceInitiator { - public static final int RESOURCE_OK = 20; - public static final int DIFFERENT_RESOURCE_VERSION = 21; - public static final int ABORT = 255; - private static final Logger logger = LoggerFactory.getLogger(RemoteResourceInitiator.class); - - public static VersionedRemoteResource initiateResourceNegotiation(final VersionedRemoteResource resource, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException { + public static final int RESOURCE_OK = 20; + public static final int DIFFERENT_RESOURCE_VERSION = 21; + public static final int ABORT = 255; + + private static final Logger logger = LoggerFactory.getLogger(RemoteResourceInitiator.class); + + public static VersionedRemoteResource initiateResourceNegotiation(final VersionedRemoteResource resource, final DataInputStream dis, final DataOutputStream dos) + throws IOException, HandshakeException { // Write the classname of the RemoteStreamCodec, followed by its version - logger.debug("Negotiating resource; proposal is {}", resource); - dos.writeUTF(resource.getResourceName()); - final VersionNegotiator negotiator = resource.getVersionNegotiator(); - dos.writeInt(negotiator.getVersion()); - dos.flush(); - + logger.debug("Negotiating resource; proposal is {}", resource); + dos.writeUTF(resource.getResourceName()); + final VersionNegotiator negotiator = resource.getVersionNegotiator(); + dos.writeInt(negotiator.getVersion()); + dos.flush(); + // wait for response from server. - logger.debug("Receiving response from remote instance"); + logger.debug("Receiving response from remote instance"); final int statusCode = dis.read(); switch (statusCode) { - case RESOURCE_OK: // server accepted our proposal of codec name/version + case RESOURCE_OK: // server accepted our proposal of codec name/version logger.debug("Response was RESOURCE_OK"); return resource; - case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version + case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version logger.debug("Response was DIFFERENT_RESOURCE_VERSION"); // Get server's preferred version - final int newVersion = dis.readInt(); - + final int newVersion = dis.readInt(); + // Determine our new preferred version that is no greater than the server's preferred version. final Integer newPreference = negotiator.getPreferredVersion(newVersion); // If we could not agree with server on a version, fail now. - if ( newPreference == null ) { + if (newPreference == null) { throw new HandshakeException("Could not agree on version for " + resource); } - + negotiator.setVersion(newPreference); - + // Attempt negotiation of resource based on our new preferred version. return initiateResourceNegotiation(resource, dis, dos); case ABORT: logger.debug("Response was ABORT"); - throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF()); + throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF()); default: logger.debug("Response was {}; unable to negotiate codec", statusCode); - return null; // Unable to negotiate codec + return null; // Unable to negotiate codec } - } + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java index eb7312d..bfa5c82 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java @@ -21,52 +21,57 @@ import java.util.Map; import org.apache.nifi.remote.protocol.DataPacket; - /** * <p> * Provides a transaction for performing site-to-site data transfers. * </p> - * + * * <p> - * A Transaction is created by calling the - * {@link org.apache.nifi.remote.client.SiteToSiteClient#createTransaction(TransferDirection) createTransaction(TransferDirection)} - * method of a {@link org.apache.nifi.remote.client.SiteToSiteClient SiteToSiteClient}. The resulting Transaction - * can be used to either send or receive data but not both. A new Transaction must be created in order perform the - * other operation. + * A Transaction is created by calling the + * {@link org.apache.nifi.remote.client.SiteToSiteClient#createTransaction(TransferDirection) createTransaction(TransferDirection)} + * method of a + * {@link org.apache.nifi.remote.client.SiteToSiteClient SiteToSiteClient}. The + * resulting Transaction can be used to either send or receive data but not + * both. A new Transaction must be created in order perform the other operation. * </p> - * + * * <p> * The general flow of execute of a Transaction is as follows: * <ol> - * <li>Create the transaction as described above.</li> - * <li>Send data via the {@link #send(DataPacket)} method or receive data via the {@link #receive()} method. This method - * will be called 1 or more times. In the case of receive, this method should be called until the method returns {@code null}, - * signifying that the remote instance is finished sending data. <b>Note:</b> <code>receive()</code> should not be - * called a second time without first fully consuming the stream from the previous Packet that was received.</li> - * <li>Confirm the transaction via the {@link #confirm()} method.</li> - * <li>Either complete the transaction via the {@link #complete(boolean)} method or cancel the transaction - * via the {@link #cancel()} method.</li> + * <li>Create the transaction as described above.</li> + * <li>Send data via the {@link #send(DataPacket)} method or receive data via + * the {@link #receive()} method. This method will be called 1 or more times. In + * the case of receive, this method should be called until the method returns + * {@code null}, signifying that the remote instance is finished sending data. + * <b>Note:</b> <code>receive()</code> should not be called a second time + * without first fully consuming the stream from the previous Packet that was + * received.</li> + * <li>Confirm the transaction via the {@link #confirm()} method.</li> + * <li>Either complete the transaction via the {@link #complete(boolean)} method + * or cancel the transaction via the {@link #cancel()} method.</li> * </ol> * </p> - * + * * <p> - * It is important that the Transaction be terminated in order to free the resources held - * by the Transaction. If a Transaction is not terminated, its resources will not be freed and - * if the Transaction holds connections from a connection pool, the connections in that pool - * will eventually become exhausted. A Transaction is terminated by calling one of the following + * It is important that the Transaction be terminated in order to free the + * resources held by the Transaction. If a Transaction is not terminated, its + * resources will not be freed and if the Transaction holds connections from a + * connection pool, the connections in that pool will eventually become + * exhausted. A Transaction is terminated by calling one of the following * methods: - * <ul> - * <li>{@link #complete(boolean)}</li> - * <li>{@link #cancel()}</li> - * <li>{@link #error()}</li> - * </ul> + * <ul> + * <li>{@link #complete(boolean)}</li> + * <li>{@link #cancel()}</li> + * <li>{@link #error()}</li> + * </ul> * </p> - * + * * <p> - * If at any point an IOException is thrown from one of the methods of the Transaction, that Transaction - * is automatically closed via a call to {@link #error()}. + * If at any point an IOException is thrown from one of the methods of the + * Transaction, that Transaction is automatically closed via a call to + * {@link #error()}. * </p> - * + * * <p> * The Transaction class should not be assumed to be thread-safe. * </p> @@ -75,140 +80,146 @@ public interface Transaction { /** * Sends information to the remote NiFi instance. - * + * * @param dataPacket the data packet to send - * @throws IOException + * @throws IOException if unable to send */ void send(DataPacket dataPacket) throws IOException; - + /** - * Sends the given byte array as the content of a {@link DataPacket} along with the - * provided attributes - * - * @param content - * @param attributes - * @throws IOException + * Sends the given byte array as the content of a {@link DataPacket} along + * with the provided attributes + * + * @param content to send + * @param attributes of the content + * @throws IOException if unable to send */ void send(byte[] content, Map<String, String> attributes) throws IOException; - + /** - * Retrieves information from the remote NiFi instance, if any is available. If no data is available, will return - * {@code null}. It is important to consume all data from the remote NiFi instance before attempting to - * call {@link #confirm()}. This is because the sender is always responsible for determining when the Transaction - * has finished. This is done in order to prevent the need for a round-trip network request to receive data for + * Retrieves information from the remote NiFi instance, if any is available. + * If no data is available, will return {@code null}. It is important to + * consume all data from the remote NiFi instance before attempting to call + * {@link #confirm()}. This is because the sender is always responsible for + * determining when the Transaction has finished. This is done in order to + * prevent the need for a round-trip network request to receive data for * each data packet. - * - * @return the DataPacket received, or {@code null} if there is no more data to receive. - * @throws IOException + * + * @return the DataPacket received, or {@code null} if there is no more data + * to receive. + * @throws IOException if unable to receive */ DataPacket receive() throws IOException; /** * <p> - * Confirms the data that was sent or received by comparing CRC32's of the data sent and the data received. + * Confirms the data that was sent or received by comparing CRC32's of the + * data sent and the data received. + * </p> + * + * <p> + * Even if the protocol being used to send the data is reliable and + * guarantees ordering of packets (such as TCP), it is still required that + * we confirm the transaction before completing the transaction. This is + * done as "safety net" or a defensive programming technique. Mistakes + * happen, and this mechanism helps to ensure that if a bug exists somewhere + * along the line that we do not end up sending or receiving corrupt data. + * If the CRC32 of the sender and the CRC32 of the receiver do not match, an + * IOException will be thrown and both the sender and receiver will cancel + * the transaction automatically. * </p> - * + * * <p> - * Even if the protocol being used to send the data is reliable and guarantees ordering of packets (such as TCP), - * it is still required that we confirm the transaction before completing the transaction. This is done as - * "safety net" or a defensive programming technique. Mistakes happen, and this mechanism helps to ensure that if - * a bug exists somewhere along the line that we do not end up sending or receiving corrupt data. If the - * CRC32 of the sender and the CRC32 of the receiver do not match, an IOException will be thrown and both the - * sender and receiver will cancel the transaction automatically. + * If the {@link TransferDirection} of this Transaction is RECEIVE, this + * method will throw an Exception unless all data from the remote instance + * has been consumed (i.e., a call to {@link #receive()} returns + * {@code null}). * </p> - * + * + * <p> + * If the {@link TransferDirection} of this Transaction is SEND, calling + * this method dictates that no more data will be sent in this transaction. + * I.e., there will be no more calls to {@link #send(DataPacket)}. + * </p> + * + * @throws IOException if unable to confirm transaction + */ + void confirm() throws IOException; + + /** * <p> - * If the {@link TransferDirection} of this Transaction is RECEIVE, this method will throw an Exception unless - * all data from the remote instance has been consumed (i.e., a call to {@link #receive()} returns {@code null}). + * Completes the transaction and indicates to both the sender and receiver + * that the data transfer was successful. * </p> - * + * + * @throws IOException if unable to complete + * + * @return a TransactionCompletion that contains details about the + * Transaction + */ + TransactionCompletion complete() throws IOException; + + /** * <p> - * If the {@link TransferDirection} of this Transaction is SEND, calling this method dictates that no more data will be - * sent in this transaction. I.e., there will be no more calls to {@link #send(DataPacket)}. + * Cancels this transaction, indicating to the sender that the data has not + * been successfully received so that the sender can retry or handle however + * is appropriate. * </p> - * - * @throws IOException + * + * @param explanation an explanation to tell the other party why the + * transaction was canceled. + * @throws IOException if unable to cancel */ - void confirm() throws IOException; - - /** + void cancel(final String explanation) throws IOException; + + /** * <p> - * Completes the transaction and indicates to both the sender and receiver that the data transfer was - * successful. + * Sets the TransactionState of the Transaction to + * {@link TransactionState#ERROR}, and closes the Transaction. The + * underlying connection should not be returned to a connection pool in this + * case. * </p> - * - * @throws IOException - * - * @return a TransactionCompletion that contains details about the Transaction */ - TransactionCompletion complete() throws IOException; - - /** - * <p> - * Cancels this transaction, indicating to the sender that the data has not been successfully received so that - * the sender can retry or handle however is appropriate. - * </p> - * - * @param explanation an explanation to tell the other party why the transaction was canceled. - * @throws IOException - */ - void cancel(final String explanation) throws IOException; - - - /** - * <p> - * Sets the TransactionState of the Transaction to {@link TransactionState#ERROR}, and closes - * the Transaction. The underlying connection should not be returned to a connection pool in this case. - * </p> - */ - void error(); - - - /** - * Returns the current state of the Transaction. - * @return - * @throws IOException - */ - TransactionState getState() throws IOException; - - /** - * Returns a Communicant that represents the other side of this Transaction (i.e., - * the remote NiFi instance) - * @return - */ - Communicant getCommunicant(); - - - public enum TransactionState { - /** - * Transaction has been started but no data has been sent or received. - */ - TRANSACTION_STARTED, - - /** - * Transaction has been started and data has been sent or received. - */ - DATA_EXCHANGED, - - /** - * Data that has been transferred has been confirmed via its CRC. Transaction is - * ready to be completed. - */ - TRANSACTION_CONFIRMED, - - /** - * Transaction has been successfully completed. - */ - TRANSACTION_COMPLETED, - - /** - * The Transaction has been canceled. - */ - TRANSACTION_CANCELED, - - /** - * The Transaction ended in an error. - */ - ERROR; - } + void error(); + + /** + * @return the current state of the Transaction. + * @throws IOException ioe + */ + TransactionState getState() throws IOException; + + /** + * @return a Communicant that represents the other side of this Transaction + * (i.e., the remote NiFi instance) + */ + Communicant getCommunicant(); + + public enum TransactionState { + + /** + * Transaction has been started but no data has been sent or received. + */ + TRANSACTION_STARTED, + /** + * Transaction has been started and data has been sent or received. + */ + DATA_EXCHANGED, + /** + * Data that has been transferred has been confirmed via its CRC. + * Transaction is ready to be completed. + */ + TRANSACTION_CONFIRMED, + /** + * Transaction has been successfully completed. + */ + TRANSACTION_COMPLETED, + /** + * The Transaction has been canceled. + */ + TRANSACTION_CANCELED, + /** + * The Transaction ended in an error. + */ + ERROR; + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java index be5f73a..1587e87 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java @@ -21,43 +21,44 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.remote.protocol.DataPacket; - /** - * A TransactionCompletion provides information about a {@link Transaction} that has completed successfully. + * A TransactionCompletion provides information about a {@link Transaction} that + * has completed successfully. */ public interface TransactionCompletion { - + /** - * When a sending to a NiFi instance, the server may accept the content sent to it - * but indicate that its queues are full and that the client should backoff sending - * data for a bit. This method returns <code>true</code> if the server did in fact - * request that, <code>false</code> otherwise. - * @return + * When a sending to a NiFi instance, the server may accept the content sent + * to it but indicate that its queues are full and that the client should + * backoff sending data for a bit. + * + * @return <code>true</code> if the server did in fact request that, + * <code>false</code> otherwise */ boolean isBackoff(); - + /** - * Returns the number of Data Packets that were sent to or received from the remote - * NiFi instance in the Transaction - * @return + * @return the number of Data Packets that were sent to or received from the + * remote NiFi instance in the Transaction */ int getDataPacketsTransferred(); - + /** - * Returns the number of bytes of DataPacket content that were sent to or received from - * the remote NiFI instance in the Transaction. Note that this is different than the number - * of bytes actually transferred between the client and server, as it does not take into - * account the attributes or protocol-specific information that is exchanged but rather - * takes into account only the data in the {@link InputStream} of the {@link DataPacket} - * @return + * @return the number of bytes of DataPacket content that were sent to or + * received from the remote NiFI instance in the Transaction. Note that this + * is different than the number of bytes actually transferred between the + * client and server, as it does not take into account the attributes or + * protocol-specific information that is exchanged but rather takes into + * account only the data in the {@link InputStream} of the + * {@link DataPacket} */ long getBytesTransferred(); - + /** - * Returns the amount of time that the Transaction took, from the time that the Transaction - * was created to the time that the Transaction was completed. - * @param timeUnit - * @return + * @param timeUnit unit of time for which to report the duration + * @return the amount of time that the Transaction took, from the time that + * the Transaction was created to the time that the Transaction was + * completed */ long getDuration(TimeUnit timeUnit); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java index 45029a4..979ad9c 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java @@ -16,17 +16,16 @@ */ package org.apache.nifi.remote; - /** - * An enumeration for specifying the direction in which data should be transferred between a client - * and a remote NiFi instance. + * An enumeration for specifying the direction in which data should be + * transferred between a client and a remote NiFi instance. */ public enum TransferDirection { - /** - * The client is to send data to the remote instance. - */ + + /** + * The client is to send data to the remote instance. + */ SEND, - /** * The client is to receive data from the remote instance. */