Repository: nifi Updated Branches: refs/heads/master 7726d069c -> 9dafe2db6
NIFI-1103: Add support for long polling in GetSQS processor Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/428b20fc Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/428b20fc Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/428b20fc Branch: refs/heads/master Commit: 428b20fc2553ac6ae56867630fa10bd3ac355582 Parents: e608642 Author: Adam Lamar <adamond...@gmail.com> Authored: Thu Nov 12 22:03:36 2015 -0700 Committer: Adam Lamar <adamond...@gmail.com> Committed: Thu Nov 12 22:03:36 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/nifi/processors/aws/sqs/GetSQS.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/428b20fc/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java index 10b17e9..b73dd39 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java @@ -103,8 +103,17 @@ public class GetSQS extends AbstractSQSProcessor { .expressionLanguageSupported(false) .build(); + public static final PropertyDescriptor RECEIVE_MSG_WAIT_TIME = new PropertyDescriptor.Builder() + .name("Receive Message Wait Time") + .description("The maximum amount of time to wait on a long polling receive call. Setting this to a value of 1 second or greater will reduce the number of SQS requests and decrease fetch latency at the cost of a constantly active thread.") + .expressionLanguageSupported(false) + .required(false) + .defaultValue("0 sec") + .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.SECONDS, 20, TimeUnit.SECONDS)) // 20 seconds is the maximum allowed by SQS + .build(); + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT)); + Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT, RECEIVE_MSG_WAIT_TIME)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -127,6 +136,7 @@ public class GetSQS extends AbstractSQSProcessor { request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger()); request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue()); request.setQueueUrl(queueUrl); + request.setWaitTimeSeconds(context.getProperty(RECEIVE_MSG_WAIT_TIME).asTimePeriod(TimeUnit.SECONDS).intValue()); final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());