Repository: nifi
Updated Branches:
  refs/heads/master 7726d069c -> 9dafe2db6


NIFI-1103: Add support for long polling in GetSQS processor


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/428b20fc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/428b20fc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/428b20fc

Branch: refs/heads/master
Commit: 428b20fc2553ac6ae56867630fa10bd3ac355582
Parents: e608642
Author: Adam Lamar <adamond...@gmail.com>
Authored: Thu Nov 12 22:03:36 2015 -0700
Committer: Adam Lamar <adamond...@gmail.com>
Committed: Thu Nov 12 22:03:36 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/nifi/processors/aws/sqs/GetSQS.java | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/428b20fc/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
index 10b17e9..b73dd39 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
@@ -103,8 +103,17 @@ public class GetSQS extends AbstractSQSProcessor {
             .expressionLanguageSupported(false)
             .build();
 
+    public static final PropertyDescriptor RECEIVE_MSG_WAIT_TIME = new 
PropertyDescriptor.Builder()
+            .name("Receive Message Wait Time")
+            .description("The maximum amount of time to wait on a long polling 
receive call. Setting this to a value of 1 second or greater will reduce the 
number of SQS requests and decrease fetch latency at the cost of a constantly 
active thread.")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .defaultValue("0 sec")
+            .addValidator(StandardValidators.createTimePeriodValidator(0, 
TimeUnit.SECONDS, 20, TimeUnit.SECONDS))  // 20 seconds is the maximum allowed 
by SQS
+            .build();
+
     public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
-            Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, 
SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, 
VISIBILITY_TIMEOUT));
+            Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, 
SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, 
VISIBILITY_TIMEOUT, RECEIVE_MSG_WAIT_TIME));
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -127,6 +136,7 @@ public class GetSQS extends AbstractSQSProcessor {
         
request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger());
         
request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue());
         request.setQueueUrl(queueUrl);
+        
request.setWaitTimeSeconds(context.getProperty(RECEIVE_MSG_WAIT_TIME).asTimePeriod(TimeUnit.SECONDS).intValue());
 
         final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
 

Reply via email to