Repository: usergrid Updated Branches: refs/heads/delete-event-updates 1b43bda3f -> 76476f17c
Updates the message flow to allow for multiple processor threads per SQS take thread Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/76476f17 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/76476f17 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/76476f17 Branch: refs/heads/delete-event-updates Commit: 76476f17cf8e8be6f01660db3d21110eda8247f5 Parents: 1b43bda Author: Todd Nine <tn...@apigee.com> Authored: Tue Oct 27 14:35:34 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Tue Oct 27 14:35:34 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 68 +++++++++++--------- .../index/IndexProcessorFig.java | 13 +++- 2 files changed, 51 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/76476f17/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index d93e304..6b9abbc 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -679,35 +679,45 @@ public class AmazonAsyncEventService implements AsyncEventService { } while ( true ); } - } ) - //this won't block our read loop, just reads and proceeds - .map( messages -> { - if ( messages == null || messages.size() == 0 ) { - return null; - } - - try { - List<IndexEventResult> indexEventResults = callEventHandlers( messages ); - List<QueueMessage> messagesToAck = submitToIndex( indexEventResults ); - if ( messagesToAck == null || messagesToAck.size() == 0 ) { - logger.error( "No messages came back from the queue operation should have seen " - + messages.size(), messages ); - return messagesToAck; - } - if ( messagesToAck.size() < messages.size() ) { - logger.error( "Missing messages from queue post operation", messages, - messagesToAck ); - } - //ack each message, but only if we didn't error. - ack( messagesToAck ); - return messagesToAck; - } - catch ( Exception e ) { - logger.error( "failed to ack messages to sqs", e ); - return null; - //do not rethrow so we can process all of them - } - } ); + } ) //this won't block our read loop, just reads and proceeds + .flatMap( sqsMessages -> { + + //do this on a different schedule, and introduce concurrency with flatmap for faster processing + return Observable.just( sqsMessages ) + + .map( messages -> { + if ( messages == null || messages.size() == 0 ) { + return null; + } + + try { + List<IndexEventResult> indexEventResults = + callEventHandlers( messages ); + List<QueueMessage> messagesToAck = + submitToIndex( indexEventResults ); + if ( messagesToAck == null || messagesToAck.size() == 0 ) { + logger.error( + "No messages came back from the queue operation should " + + "have seen " + + messages.size(), messages ); + return messagesToAck; + } + if ( messagesToAck.size() < messages.size() ) { + logger.error( "Missing messages from queue post operation", + messages, messagesToAck ); + } + //ack each message, but only if we didn't error. + ack( messagesToAck ); + return messagesToAck; + } + catch ( Exception e ) { + logger.error( "failed to ack messages to sqs", e ); + return null; + //do not rethrow so we can process all of them + } + } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); + //end flatMap + }, indexProcessorFig.getEventConcurrencyFactor() ); //start in the background http://git-wip-us.apache.org/repos/asf/usergrid/blob/76476f17/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java index 7650c62..9d02717 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java @@ -36,6 +36,8 @@ public interface IndexProcessorFig extends GuicyFig { String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count"; + String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor"; + String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl"; String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout"; @@ -70,9 +72,18 @@ public interface IndexProcessorFig extends GuicyFig { int getIndexQueueVisibilityTimeout(); /** + * The number of worker threads used when handing off messages from the SQS thread + */ + @Default( "20" ) + @Key( EVENT_CONCURRENCY_FACTOR ) + int getEventConcurrencyFactor(); + + + + /** * The number of worker threads used to read index write requests from the queue. */ - @Default( "16" ) + @Default( "8" ) @Key( ELASTICSEARCH_WORKER_COUNT ) int getWorkerCount();