http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 9a4fc5b..24c82dd 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -56,35 +56,35 @@ import com.amazonaws.services.s3.model.StorageClass; @SeeAlso({FetchS3Object.class}) @Tags({"Amazon", "S3", "AWS", "Archive", "Put"}) @CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket") -@DynamicProperty(name="The name of a User-Defined Metadata field to add to the S3 Object", - value="The value of a User-Defined Metadata field to add to the S3 Object", - description="Allows user-defined metadata to be added to the S3 object as key/value pairs", - supportsExpressionLanguage=true) -@ReadsAttribute(attribute="filename", description="Uses the FlowFile's filename as the filename for the S3 object") +@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", + value = "The value of a User-Defined Metadata field to add to the S3 Object", + description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", + supportsExpressionLanguage = true) +@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object") @WritesAttributes({ - @WritesAttribute(attribute="s3.version", description="The version of the S3 Object that was put to S3"), - @WritesAttribute(attribute="s3.etag", description="The ETag of the S3 Object"), - @WritesAttribute(attribute="s3.expiration", description="A human-readable form of the expiration date of the S3 object, if one is set") + @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"), + @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"), + @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set") }) public class PutS3Object extends AbstractS3Processor { + public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder() - .name("Expiration Time Rule") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Expiration Time Rule") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder() - .name("Storage Class") - .required(true) - .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name()) - .defaultValue(StorageClass.Standard.name()) - .build(); + .name("Storage Class") + .required(true) + .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name()) + .defaultValue(StorageClass.Standard.name()) + .build(); public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, - FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER) ); - + Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, + FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -94,21 +94,21 @@ public class PutS3Object extends AbstractS3Processor { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .dynamic(true) - .build(); + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); } public void onTrigger(final ProcessContext context, final ProcessSession session) { FlowFile flowFile = session.get(); - if ( flowFile == null ) { + if (flowFile == null) { return; } - + final long startNanos = System.nanoTime(); - + final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); @@ -123,59 +123,59 @@ public class PutS3Object extends AbstractS3Processor { final ObjectMetadata objectMetadata = new ObjectMetadata(); objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key())); objectMetadata.setContentLength(ff.getSize()); - + final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue(); - if ( expirationRule != null ) { + if (expirationRule != null) { objectMetadata.setExpirationTimeRuleId(expirationRule); } - + final Map<String, String> userMetadata = new HashMap<>(); - for ( final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet() ) { - if ( entry.getKey().isDynamic() ) { + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + if (entry.getKey().isDynamic()) { final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue(); userMetadata.put(entry.getKey().getName(), value); } } - - if ( !userMetadata.isEmpty() ) { + + if (!userMetadata.isEmpty()) { objectMetadata.setUserMetadata(userMetadata); } - + final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata); request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue())); final AccessControlList acl = createACL(context, ff); - if ( acl != null ) { + if (acl != null) { request.setAccessControlList(acl); } - + final PutObjectResult result = s3.putObject(request); - if ( result.getVersionId() != null ) { + if (result.getVersionId() != null) { attributes.put("s3.version", result.getVersionId()); } - + attributes.put("s3.etag", result.getETag()); - + final Date expiration = result.getExpirationTime(); - if ( expiration != null ) { + if (expiration != null) { attributes.put("s3.expiration", expiration.toString()); } } } }); - - if ( !attributes.isEmpty() ) { + + if (!attributes.isEmpty()) { flowFile = session.putAllAttributes(flowFile, attributes); } session.transfer(flowFile, REL_SUCCESS); - + final String url = "http://" + bucket + ".s3.amazonaws.com/" + key; final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); session.getProvenanceReporter().send(flowFile, url, millis); - - getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis}); + + getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[]{ff, millis}); } catch (final ProcessException | AmazonClientException pe) { - getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe}); + getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[]{flowFile, pe}); session.transfer(flowFile, REL_FAILURE); } } -} \ No newline at end of file +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java index 5447169..5b57647 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java @@ -28,30 +28,28 @@ import com.amazonaws.services.sns.AmazonSNSClient; public abstract class AbstractSNSProcessor extends AbstractAWSProcessor<AmazonSNSClient> { - protected static final AllowableValue ARN_TYPE_TOPIC = - new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic"); - protected static final AllowableValue ARN_TYPE_TARGET = - new AllowableValue("Target ARN", "Target ARN", "The ARN is the name of a particular Target, used to notify a specific subscriber"); - + protected static final AllowableValue ARN_TYPE_TOPIC + = new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic"); + protected static final AllowableValue ARN_TYPE_TARGET + = new AllowableValue("Target ARN", "Target ARN", "The ARN is the name of a particular Target, used to notify a specific subscriber"); + public static final PropertyDescriptor ARN = new PropertyDescriptor.Builder() - .name("Amazon Resource Name (ARN)") - .description("The name of the resource to which notifications should be published") - .expressionLanguageSupported(true) - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - + .name("Amazon Resource Name (ARN)") + .description("The name of the resource to which notifications should be published") + .expressionLanguageSupported(true) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor ARN_TYPE = new PropertyDescriptor.Builder() - .name("ARN Type") - .description("The type of Amazon Resource Name that is being used.") - .expressionLanguageSupported(false) - .required(true) - .allowableValues(ARN_TYPE_TOPIC, ARN_TYPE_TARGET) - .defaultValue(ARN_TYPE_TOPIC.getValue()) - .build(); - - - + .name("ARN Type") + .description("The type of Amazon Resource Name that is being used.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(ARN_TYPE_TOPIC, ARN_TYPE_TARGET) + .defaultValue(ARN_TYPE_TOPIC.getValue()) + .build(); + @Override protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { return new AmazonSNSClient(credentials, config); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java index 1de3251..b1a604f 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java @@ -46,34 +46,34 @@ import com.amazonaws.services.sns.model.PublishRequest; public class PutSNS extends AbstractSNSProcessor { public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder() - .name("Character Set") - .description("The character set in which the FlowFile's content is encoded") - .defaultValue("UTF-8") - .expressionLanguageSupported(true) - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .required(true) - .build(); + .name("Character Set") + .description("The character set in which the FlowFile's content is encoded") + .defaultValue("UTF-8") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .required(true) + .build(); public static final PropertyDescriptor USE_JSON_STRUCTURE = new PropertyDescriptor.Builder() - .name("Use JSON Structure") - .description("If true, the contents of the FlowFile must be JSON with a top-level element named 'default'. Additional elements can be used to send different messages to different protocols. See the Amazon SNS Documentation for more information.") - .defaultValue("false") - .allowableValues("true", "false") - .required(true) - .build(); + .name("Use JSON Structure") + .description("If true, the contents of the FlowFile must be JSON with a top-level element named 'default'. Additional elements can be used to send different messages to different protocols. See the Amazon SNS Documentation for more information.") + .defaultValue("false") + .allowableValues("true", "false") + .required(true) + .build(); public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder() - .name("E-mail Subject") - .description("The optional subject to use for any subscribers that are subscribed via E-mail") - .expressionLanguageSupported(true) - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - + .name("E-mail Subject") + .description("The optional subject to use for any subscribers that are subscribed via E-mail") + .expressionLanguageSupported(true) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT, - USE_JSON_STRUCTURE, CHARACTER_ENCODING) ); + Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT, + USE_JSON_STRUCTURE, CHARACTER_ENCODING)); public static final int MAX_SIZE = 256 * 1024; - + @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return properties; @@ -82,71 +82,70 @@ public class PutSNS extends AbstractSNSProcessor { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .required(false) - .dynamic(true) - .build(); + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .dynamic(true) + .build(); } - - + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { FlowFile flowFile = session.get(); - if ( flowFile == null ) { + if (flowFile == null) { return; } - if ( flowFile.getSize() > MAX_SIZE ) { - getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[] {flowFile}); + if (flowFile.getSize() > MAX_SIZE) { + getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[]{flowFile}); session.transfer(flowFile, REL_FAILURE); return; } - + final Charset charset = Charset.forName(context.getProperty(CHARACTER_ENCODING).evaluateAttributeExpressions(flowFile).getValue()); - + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); session.exportTo(flowFile, baos); final String message = new String(baos.toByteArray(), charset); - + final AmazonSNSClient client = getClient(); final PublishRequest request = new PublishRequest(); request.setMessage(message); - - if ( context.getProperty(USE_JSON_STRUCTURE).asBoolean() ) { + + if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) { request.setMessageStructure("json"); } - + final String arn = context.getProperty(ARN).evaluateAttributeExpressions(flowFile).getValue(); final String arnType = context.getProperty(ARN_TYPE).getValue(); - if ( arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue()) ) { + if (arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue())) { request.setTopicArn(arn); } else { request.setTargetArn(arn); } - + final String subject = context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue(); - if ( subject != null ) { + if (subject != null) { request.setSubject(subject); } - for ( final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet() ) { - if ( entry.getKey().isDynamic() && !isEmpty(entry.getValue()) ) { + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + if (entry.getKey().isDynamic() && !isEmpty(entry.getValue())) { final MessageAttributeValue value = new MessageAttributeValue(); value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue()); value.setDataType("String"); request.addMessageAttributesEntry(entry.getKey().getName(), value); } } - + try { client.publish(request); session.transfer(flowFile, REL_SUCCESS); session.getProvenanceReporter().send(flowFile, arn); - getLogger().info("Successfully published notification for {}", new Object[] {flowFile}); + getLogger().info("Successfully published notification for {}", new Object[]{flowFile}); } catch (final Exception e) { - getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[] {flowFile, e}); + getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[]{flowFile, e}); session.transfer(flowFile, REL_FAILURE); return; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java index 2ef749f..3cee02d 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java @@ -28,20 +28,20 @@ import com.amazonaws.services.sqs.AmazonSQSClient; public abstract class AbstractSQSProcessor extends AbstractAWSProcessor<AmazonSQSClient> { public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Batch Size") - .description("The maximum number of messages to send in a single network request") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("25") - .build(); + .name("Batch Size") + .description("The maximum number of messages to send in a single network request") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("25") + .build(); public static final PropertyDescriptor QUEUE_URL = new PropertyDescriptor.Builder() - .name("Queue URL") - .description("The URL of the queue to act upon") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .required(true) - .build(); + .name("Queue URL") + .description("The URL of the queue to act upon") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); @Override protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java index 2416044..65e020d 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java @@ -40,54 +40,54 @@ import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; @Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"}) @CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue") public class DeleteSQS extends AbstractSQSProcessor { + public static final PropertyDescriptor RECEIPT_HANDLE = new PropertyDescriptor.Builder() - .name("Receipt Handle") - .description("The identifier that specifies the receipt of the message") - .expressionLanguageSupported(true) - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("${sqs.receipt.handle}") - .build(); - + .name("Receipt Handle") + .description("The identifier that specifies the receipt of the message") + .expressionLanguageSupported(true) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("${sqs.receipt.handle}") + .build(); + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT) ); + Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return properties; } - @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { List<FlowFile> flowFiles = session.get(1); - if ( flowFiles.isEmpty() ) { + if (flowFiles.isEmpty()) { return; } - + final FlowFile firstFlowFile = flowFiles.get(0); final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(firstFlowFile).getValue(); - + final AmazonSQSClient client = getClient(); final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(); request.setQueueUrl(queueUrl); - + final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(flowFiles.size()); - - for ( final FlowFile flowFile : flowFiles ) { + + for (final FlowFile flowFile : flowFiles) { final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(); entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue()); entries.add(entry); } - + request.setEntries(entries); - + try { client.deleteMessageBatch(request); - getLogger().info("Successfully deleted {} objects from SQS", new Object[] {flowFiles.size()}); + getLogger().info("Successfully deleted {} objects from SQS", new Object[]{flowFiles.size()}); session.transfer(flowFiles, REL_SUCCESS); } catch (final Exception e) { - getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[] {flowFiles.size(), e}); + getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[]{flowFiles.size(), e}); session.transfer(flowFiles, REL_FAILURE); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java index 6c0257b..929a437 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java @@ -51,116 +51,116 @@ import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; @SupportsBatching -@Tags({ "Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"}) +@Tags({"Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"}) @SeeAlso({PutSQS.class, DeleteSQS.class}) @CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue") @WritesAttributes({ - @WritesAttribute(attribute="hash.value", description="The MD5 sum of the message"), - @WritesAttribute(attribute="hash.algorithm", description="MD5"), - @WritesAttribute(attribute="sqs.message.id", description="The unique identifier of the SQS message"), - @WritesAttribute(attribute="sqs.receipt.handle", description="The SQS Receipt Handle that is to be used to delete the message from the queue") + @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the message"), + @WritesAttribute(attribute = "hash.algorithm", description = "MD5"), + @WritesAttribute(attribute = "sqs.message.id", description = "The unique identifier of the SQS message"), + @WritesAttribute(attribute = "sqs.receipt.handle", description = "The SQS Receipt Handle that is to be used to delete the message from the queue") }) public class GetSQS extends AbstractSQSProcessor { + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() - .name("Character Set") - .description("The Character Set that should be used to encode the textual content of the SQS message") - .required(true) - .defaultValue("UTF-8") - .allowableValues(Charset.availableCharsets().keySet().toArray(new String[0])) - .build(); - + .name("Character Set") + .description("The Character Set that should be used to encode the textual content of the SQS message") + .required(true) + .defaultValue("UTF-8") + .allowableValues(Charset.availableCharsets().keySet().toArray(new String[0])) + .build(); + public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() - .name("Auto Delete Messages") - .description("Specifies whether the messages should be automatically deleted by the processors once they have been received.") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); - + .name("Auto Delete Messages") + .description("Specifies whether the messages should be automatically deleted by the processors once they have been received.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder() - .name("Visibility Timeout") - .description("The amount of time after a message is received but not deleted that the message is hidden from other consumers") - .expressionLanguageSupported(false) - .required(true) - .defaultValue("15 mins") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - + .name("Visibility Timeout") + .description("The amount of time after a message is received but not deleted that the message is hidden from other consumers") + .expressionLanguageSupported(false) + .required(true) + .defaultValue("15 mins") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Batch Size") - .description("The maximum number of messages to send in a single network request") - .required(true) - .addValidator(StandardValidators.createLongValidator(1L, 10L, true)) - .defaultValue("10") - .build(); - - + .name("Batch Size") + .description("The maximum number of messages to send in a single network request") + .required(true) + .addValidator(StandardValidators.createLongValidator(1L, 10L, true)) + .defaultValue("10") + .build(); + public static final PropertyDescriptor STATIC_QUEUE_URL = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(QUEUE_URL) - .expressionLanguageSupported(false) - .build(); - + .fromPropertyDescriptor(QUEUE_URL) + .expressionLanguageSupported(false) + .build(); + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT) ); + Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return properties; } - + @Override public Set<Relationship> getRelationships() { return Collections.singleton(REL_SUCCESS); } - + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { final String queueUrl = context.getProperty(STATIC_QUEUE_URL).getValue(); - + final AmazonSQSClient client = getClient(); - + final ReceiveMessageRequest request = new ReceiveMessageRequest(); request.setAttributeNames(Collections.singleton("All")); request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger()); request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue()); request.setQueueUrl(queueUrl); - + final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); - + final ReceiveMessageResult result; try { result = client.receiveMessage(request); } catch (final Exception e) { - getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[] {e}); + getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[]{e}); context.yield(); return; } - + final List<Message> messages = result.getMessages(); - if ( messages.isEmpty() ) { + if (messages.isEmpty()) { context.yield(); return; } final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean(); - - for ( final Message message : messages ) { + + for (final Message message : messages) { FlowFile flowFile = session.create(); - + final Map<String, String> attributes = new HashMap<>(); - for ( final Map.Entry<String, String> entry : message.getAttributes().entrySet() ) { + for (final Map.Entry<String, String> entry : message.getAttributes().entrySet()) { attributes.put("sqs." + entry.getKey(), entry.getValue()); } - - for ( final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet() ) { + + for (final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet()) { attributes.put("sqs." + entry.getKey(), entry.getValue().getStringValue()); } - + attributes.put("hash.value", message.getMD5OfBody()); attributes.put("hash.algorithm", "md5"); attributes.put("sqs.message.id", message.getMessageId()); attributes.put("sqs.receipt.handle", message.getReceiptHandle()); - + flowFile = session.putAllAttributes(flowFile, attributes); flowFile = session.write(flowFile, new OutputStreamCallback() { @Override @@ -168,37 +168,37 @@ public class GetSQS extends AbstractSQSProcessor { out.write(message.getBody().getBytes(charset)); } }); - + session.transfer(flowFile, REL_SUCCESS); session.getProvenanceReporter().receive(flowFile, queueUrl); - - getLogger().info("Successfully received {} from Amazon SQS", new Object[] {flowFile}); + + getLogger().info("Successfully received {} from Amazon SQS", new Object[]{flowFile}); } - - if ( autoDelete ) { + + if (autoDelete) { // If we want to auto-delete messages, we must fist commit the session to ensure that the data // is persisted in NiFi's repositories. session.commit(); - + final DeleteMessageBatchRequest deleteRequest = new DeleteMessageBatchRequest(); deleteRequest.setQueueUrl(queueUrl); final List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<>(); - for ( final Message message : messages ) { + for (final Message message : messages) { final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(); entry.setId(message.getMessageId()); entry.setReceiptHandle(message.getReceiptHandle()); deleteRequestEntries.add(entry); } - + deleteRequest.setEntries(deleteRequestEntries); - + try { client.deleteMessageBatch(deleteRequest); } catch (final Exception e) { - getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages may be duplicated. Reason for deletion failure: {}", new Object[] {messages.size(), e}); + getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages may be duplicated. Reason for deletion failure: {}", new Object[]{messages.size(), e}); } } - + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java index 81268fe..3961f32 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java @@ -44,29 +44,28 @@ import com.amazonaws.services.sqs.model.MessageAttributeValue; import com.amazonaws.services.sqs.model.SendMessageBatchRequest; import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; - @SupportsBatching @Tags({"Amazon", "AWS", "SQS", "Queue", "Put", "Publish"}) @SeeAlso({GetSQS.class, DeleteSQS.class}) @CapabilityDescription("Publishes a message to an Amazon Simple Queuing Service Queue") -@DynamicProperty(name="The name of a Message Attribute to add to the message", value="The value of the Message Attribute", - description="Allows the user to add key/value pairs as Message Attributes by adding a property whose name will become the name of " - + "the Message Attribute and value will become the value of the Message Attribute", supportsExpressionLanguage=true) +@DynamicProperty(name = "The name of a Message Attribute to add to the message", value = "The value of the Message Attribute", + description = "Allows the user to add key/value pairs as Message Attributes by adding a property whose name will become the name of " + + "the Message Attribute and value will become the value of the Message Attribute", supportsExpressionLanguage = true) public class PutSQS extends AbstractSQSProcessor { public static final PropertyDescriptor DELAY = new PropertyDescriptor.Builder() - .name("Delay") - .description("The amount of time to delay the message before it becomes available to consumers") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("0 secs") - .build(); + .name("Delay") + .description("The amount of time to delay the message before it becomes available to consumers") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 secs") + .build(); public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, DELAY, TIMEOUT) ); + Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, DELAY, TIMEOUT)); private volatile List<PropertyDescriptor> userDefinedProperties = Collections.emptyList(); - + @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return properties; @@ -75,70 +74,70 @@ public class PutSQS extends AbstractSQSProcessor { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .expressionLanguageSupported(true) - .required(false) - .dynamic(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name(propertyDescriptorName) + .expressionLanguageSupported(true) + .required(false) + .dynamic(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); } - + @OnScheduled public void setup(final ProcessContext context) { userDefinedProperties = new ArrayList<>(); - for ( final PropertyDescriptor descriptor : context.getProperties().keySet() ) { - if ( descriptor.isDynamic() ) { + for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + if (descriptor.isDynamic()) { userDefinedProperties.add(descriptor); } } } - + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { FlowFile flowFile = session.get(); - if ( flowFile == null ) { + if (flowFile == null) { return; } - + final long startNanos = System.nanoTime(); final AmazonSQSClient client = getClient(); final SendMessageBatchRequest request = new SendMessageBatchRequest(); final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue(); request.setQueueUrl(queueUrl); - + final Set<SendMessageBatchRequestEntry> entries = new HashSet<>(); - + final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry(); entry.setId(flowFile.getAttribute("uuid")); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); session.exportTo(flowFile, baos); final String flowFileContent = baos.toString(); entry.setMessageBody(flowFileContent); - + final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); - - for ( final PropertyDescriptor descriptor : userDefinedProperties ) { + + for (final PropertyDescriptor descriptor : userDefinedProperties) { final MessageAttributeValue mav = new MessageAttributeValue(); mav.setDataType("String"); mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue()); messageAttributes.put(descriptor.getName(), mav); } - + entry.setMessageAttributes(messageAttributes); entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue()); entries.add(entry); - + request.setEntries(entries); - + try { client.sendMessageBatch(request); } catch (final Exception e) { - getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[] {e}); + getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[]{e}); session.transfer(flowFile, REL_FAILURE); return; } - - getLogger().info("Successfully published message to Amazon SQS for {}", new Object[] {flowFile}); + + getLogger().info("Successfully published message to Amazon SQS for {}", new Object[]{flowFile}); session.transfer(flowFile, REL_SUCCESS); final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); session.getProvenanceReporter().send(flowFile, queueUrl, transmissionMillis); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java index 40f9515..0321514 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.aws.s3; import java.io.IOException; @@ -15,30 +31,31 @@ import org.junit.Test; @Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") public class TestFetchS3Object { + private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; - + @Test public void testGet() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object()); runner.setProperty(FetchS3Object.BUCKET, "anonymous-test-bucket-00000000"); runner.setProperty(FetchS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE); runner.setProperty(FetchS3Object.KEY, "folder/1.txt"); - + final Map<String, String> attrs = new HashMap<>(); attrs.put("start", "0"); - + runner.enqueue(new byte[0], attrs); runner.run(1); - + runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1); final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS); final MockFlowFile out = ffs.iterator().next(); - + final byte[] expectedBytes = Files.readAllBytes(Paths.get("src/test/resources/hello.txt")); out.assertContentEquals(new String(expectedBytes)); - for ( final Map.Entry<String, String> entry : out.getAttributes().entrySet() ) { + for (final Map.Entry<String, String> entry : out.getAttributes().entrySet()) { System.out.println(entry.getKey() + " : " + entry.getValue()); } } - + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java index 0a019f3..de7816d 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.aws.s3; import java.io.IOException; @@ -17,55 +33,54 @@ import com.amazonaws.services.s3.model.StorageClass; public class TestPutS3Object { private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; - + @Test public void testSimplePut() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new PutS3Object()); runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE); runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012"); runner.setProperty(PutS3Object.EXPIRATION_RULE_ID, "Expire Quickly"); - Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() ); - - for (int i=0; i < 3; i++) { + Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); + + for (int i = 0; i < 3; i++) { final Map<String, String> attrs = new HashMap<>(); attrs.put("filename", String.valueOf(i) + ".txt"); runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); } runner.run(3); - + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 3); } - + @Test public void testPutInFolder() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new PutS3Object()); runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012"); runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE); runner.setProperty(PutS3Object.EXPIRATION_RULE_ID, "Expire Quickly"); - Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() ); - + Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); + final Map<String, String> attrs = new HashMap<>(); attrs.put("filename", "folder/1.txt"); runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); runner.run(); - + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); } - @Test public void testStorageClass() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new PutS3Object()); runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012"); runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE); runner.setProperty(PutS3Object.STORAGE_CLASS, StorageClass.ReducedRedundancy.name()); - Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() ); - + Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); + final Map<String, String> attrs = new HashMap<>(); attrs.put("filename", "folder/2.txt"); runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); runner.run(); - + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); } @@ -75,12 +90,12 @@ public class TestPutS3Object { runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012"); runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE); runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST, "28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5"); - + final Map<String, String> attrs = new HashMap<>(); attrs.put("filename", "folder/4.txt"); runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); runner.run(); - + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java index b505622..1e914c7 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.aws.sns; import static org.junit.Assert.assertTrue; @@ -14,20 +30,21 @@ import org.junit.Test; @Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") public class TestPutSNS { + private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; - + @Test public void testPublish() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new PutSNS()); runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE); runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:100515378163:test-topic-1"); - assertTrue( runner.setProperty("DynamicProperty", "hello!").isValid() ); - + assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid()); + final Map<String, String> attrs = new HashMap<>(); attrs.put("filename", "1.txt"); runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); runner.run(); - + runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java index de4a5d9..0e70e7b 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.aws.sqs; import java.util.List; @@ -11,6 +27,7 @@ import org.junit.Test; @Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") public class TestGetSQS { + private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; @Test @@ -19,11 +36,11 @@ public class TestGetSQS { runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE); runner.setProperty(GetSQS.TIMEOUT, "30 secs"); runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000"); - + runner.run(1); - + final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); - for ( final MockFlowFile mff : flowFiles ) { + for (final MockFlowFile mff : flowFiles) { System.out.println(mff.getAttributes()); System.out.println(new String(mff.toByteArray())); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java index a90a4ce..1f9851a 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.aws.sqs; import java.io.IOException; @@ -14,6 +30,7 @@ import org.junit.Test; @Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") public class TestPutSQS { + private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; @Test @@ -22,13 +39,13 @@ public class TestPutSQS { runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE); runner.setProperty(PutSQS.TIMEOUT, "30 secs"); runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000"); - Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() ); - + Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); + final Map<String, String> attrs = new HashMap<>(); attrs.put("filename", "1.txt"); runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); runner.run(1); - + runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml index 117d7dd..4435327 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml @@ -30,14 +30,14 @@ <module>nifi-aws-nar</module> </modules> - <dependencyManagement> - <dependencies> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk</artifactId> - <version>1.9.24</version> - </dependency> - </dependencies> - </dependencyManagement> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk</artifactId> + <version>1.9.24</version> + </dependency> + </dependencies> + </dependencyManagement> </project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-nar/src/main/resources/META-INF/NOTICE b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..bde2a66 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,68 @@ +nifi-geo-nar +Copyright 2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2014 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache HttpComponents + The following NOTICE information applies: + Apache HttpClient + Copyright 1999-2014 The Apache Software Foundation + + Apache HttpCore + Copyright 2005-2014 The Apache Software Foundation + + This project contains annotations derived from JCIP-ANNOTATIONS + Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net + + (ASLv2) Apache Commons Codec + The following NOTICE information applies: + Apache Commons Codec + Copyright 2002-2014 The Apache Software Foundation + + src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java + contains test data from http://aspell.net/test/orig/batch0.tab. + Copyright (C) 2002 Kevin Atkinson (kev...@gnu.org) + + =============================================================================== + + The content of package org.apache.commons.codec.language.bm has been translated + from the original php source code available at http://stevemorse.org/phoneticinfo.htm + with permission from the original authors. + Original source copyright: + Copyright (c) 2008 Alexander Beider & Stephen P. Morse. + + (ASLv2) Apache Commons Logging + The following NOTICE information applies: + Apache Commons Logging + Copyright 2003-2013 The Apache Software Foundation + + (ASLv2) GeoIP2 Java API + The following NOTICE information applies: + GeoIP2 Java API + This software is Copyright (c) 2013 by MaxMind, Inc. + +************************ +Creative Commons Attribution-ShareAlike 3.0 +************************ + +The following binary components are provided under the Creative Commons Attribution-ShareAlike 3.0. See project link for details. + + (CCAS 3.0) MaxMind DB (https://github.com/maxmind/MaxMind-DB) + + + http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-nar/src/main/resources/META-INF/NOTICE b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..30b099f --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,29 @@ +nifi-hl7-nar +Copyright 2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2014 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + +***************** +Mozilla Public License v1.1 +***************** + +The following binary components are provided under the Mozilla Public License v1.1. See project link for details. + + (MPL 1.1) HAPI Base (ca.uhn.hapi:hapi-base:2.2 - http://hl7api.sourceforge.net/) + (MPL 1.1) HAPI Structures (ca.uhn.hapi:hapi-structures-v*:2.2 - http://hl7api.sourceforge.net/) + http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/060a1e0d/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/src/main/resources/META-INF/NOTICE b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..33bcc0d --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,57 @@ +nifi-social-media-nar +Copyright 2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2014 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache Commons Codec + The following NOTICE information applies: + Apache Commons Codec + Copyright 2002-2014 The Apache Software Foundation + + src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java + contains test data from http://aspell.net/test/orig/batch0.tab. + Copyright (C) 2002 Kevin Atkinson (kev...@gnu.org) + + =============================================================================== + + The content of package org.apache.commons.codec.language.bm has been translated + from the original php source code available at http://stevemorse.org/phoneticinfo.htm + with permission from the original authors. + Original source copyright: + Copyright (c) 2008 Alexander Beider & Stephen P. Morse. + + (ASLv2) Apache Commons Logging + The following NOTICE information applies: + Apache Commons Logging + Copyright 2003-2013 The Apache Software Foundation + + (ASLv2) Twitter4J + The following NOTICE information applies: + Copyright 2007 Yusuke Yamamoto + + Twitter4J includes software from JSON.org to parse JSON response from the Twitter API. You can see the license term at http://www.JSON.org/license.html + + (ASLv2) JOAuth + The following NOTICE information applies: + JOAuth + Copyright 2010-2013 Twitter, Inc + + (ASLv2) Hosebird Client + The following NOTICE information applies: + Hosebird Client (hbc) + Copyright 2013 Twitter, Inc. \ No newline at end of file