Add a utility queue to the async event service for things like re-index.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f8c92f6e Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f8c92f6e Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f8c92f6e Branch: refs/heads/asf-site Commit: f8c92f6eff376a58394bca3be2340ace6d0de02c Parents: 5db402d Author: Michael Russo <mru...@apigee.com> Authored: Mon Oct 3 22:48:49 2016 -0700 Committer: Michael Russo <mru...@apigee.com> Committed: Mon Oct 3 22:48:49 2016 -0700 ---------------------------------------------------------------------- .../asyncevents/AsyncEventService.java | 3 +- .../asyncevents/AsyncEventServiceImpl.java | 114 +++++++++++++++---- .../index/IndexProcessorFig.java | 9 ++ .../corepersistence/index/ReIndexAction.java | 5 +- .../index/ReIndexServiceImpl.java | 4 +- .../read/traverse/AbstractReadGraphFilter.java | 2 +- .../AbstractReadReverseGraphFilter.java | 2 +- 7 files changed, 110 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index d833cf7..ec84a0a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -81,8 +81,9 @@ public interface AsyncEventService extends ReIndexAction { /** * * @param indexOperationMessage + * @param forUtilityQueue */ - void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ); + void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue); /** * @param applicationScope http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index a108e40..bc5d139 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -78,6 +78,8 @@ import rx.Subscriber; import rx.Subscription; import rx.schedulers.Schedulers; +import static org.apache.commons.lang.StringUtils.isNotEmpty; + /** * TODO, this whole class is becoming a nightmare. We need to remove all consume from this class and refactor it into the following manner. @@ -103,8 +105,11 @@ public class AsyncEventServiceImpl implements AsyncEventService { // SQS maximum receive messages is 10 public int MAX_TAKE = 10; public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars + public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars + private final QueueManager queue; + private final QueueManager utilityQueue; private final IndexProcessorFig indexProcessorFig; private final QueueFig queueFig; private final IndexProducer indexProducer; @@ -125,6 +130,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { private final Counter indexErrorCounter; private final AtomicLong counter = new AtomicLong(); + private final AtomicLong counterUtility = new AtomicLong(); private final AtomicLong inFlight = new AtomicLong(); private final Histogram messageCycle; private final MapManager esMapPersistence; @@ -160,7 +166,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { this.rxTaskScheduler = rxTaskScheduler; QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL); + QueueScope utilityQueueScope = new QueueScopeImpl(QUEUE_NAME_UTILITY, QueueScope.RegionImplementation.ALL); this.queue = queueManagerFactory.getQueueManager(queueScope); + this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope); this.indexProcessorFig = indexProcessorFig; this.queueFig = queueFig; @@ -201,12 +209,16 @@ public class AsyncEventServiceImpl implements AsyncEventService { } - private void offerTopic( final Serializable operation ) { + private void offerTopic(final Serializable operation, boolean forUtilityQueue) { final Timer.Context timer = this.writeTimer.time(); try { //signal to SQS - this.queue.sendMessageToTopic( operation ); + if (forUtilityQueue) { + this.utilityQueue.sendMessageToTopic(operation); + } else { + this.queue.sendMessageToTopic(operation); + } } catch ( IOException e ) { throw new RuntimeException( "Unable to queue message", e ); @@ -216,12 +228,16 @@ public class AsyncEventServiceImpl implements AsyncEventService { } } - private void offerBatch(final List operations){ - final Timer.Context timer = this.writeTimer.time(); + private void offerBatch(final List operations, boolean forUtilityQueue){ + final Timer.Context timer = this.writeTimer.time(); try { //signal to SQS - this.queue.sendMessages(operations); + if( forUtilityQueue ){ + this.utilityQueue.sendMessages(operations); + }else{ + this.queue.sendMessages(operations); + } } catch (IOException e) { throw new RuntimeException("Unable to queue message", e); } finally { @@ -229,6 +245,15 @@ public class AsyncEventServiceImpl implements AsyncEventService { } } + private void offerBatchToUtilityQueue(final List operations){ + try { + //signal to SQS + this.utilityQueue.sendMessages(operations); + } catch (IOException e) { + throw new RuntimeException("Unable to queue message", e); + } + } + /** * Take message from SQS @@ -246,6 +271,22 @@ public class AsyncEventServiceImpl implements AsyncEventService { } } + /** + * Take message from SQS utility queue + */ + private List<QueueMessage> takeFromUtilityQueue() { + + final Timer.Context timer = this.readTimer.time(); + + try { + return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class); + } + finally { + //stop our timer + timer.stop(); + } + } + /** @@ -271,6 +312,17 @@ public class AsyncEventServiceImpl implements AsyncEventService { } /** + * Ack message in SQS + */ + public void ackUtilityQueue(final List<QueueMessage> messages) { + try{ + utilityQueue.commitMessages( messages ); + }catch(Exception e){ + throw new RuntimeException("Unable to ack messages", e); + } + } + + /** * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed * @param messages * @return @@ -401,7 +453,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy( applicationScope); offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(), - new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) ); + new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), false); } @@ -418,7 +470,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { final IndexOperationMessage indexMessage = eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null); - queueIndexOperationMessage( indexMessage ); + queueIndexOperationMessage( indexMessage, false); } @@ -515,8 +567,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { /** * Queue up an indexOperationMessage for multi region execution * @param indexOperationMessage + * @param forUtilityQueue */ - public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) { + public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue) { // don't try to produce something with nothing if(indexOperationMessage == null || indexOperationMessage.isEmpty()){ @@ -533,8 +586,6 @@ public class AsyncEventServiceImpl implements AsyncEventService { //write to the map in ES esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds ); - - //now queue up the index message final ElasticsearchIndexEvent elasticsearchIndexEvent = @@ -542,7 +593,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { //send to the topic so all regions index the batch - offerTopic( elasticsearchIndexEvent ); + offerTopic( elasticsearchIndexEvent, forUtilityQueue ); } private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) throws IndexDocNotFoundException { @@ -607,7 +658,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { // queue the de-index of old versions to the topic so cleanup happens in all regions offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(), - new EntityIdScope( applicationScope, entityId), markedVersion) ); + new EntityIdScope( applicationScope, entityId), markedVersion), false); } @@ -717,9 +768,14 @@ public class AsyncEventServiceImpl implements AsyncEventService { */ public void start() { final int count = indexProcessorFig.getWorkerCount(); + final int utilityCount = indexProcessorFig.getWorkerCountUtility(); for (int i = 0; i < count; i++) { - startWorker(); + startWorker(QUEUE_NAME); + } + + for (int i = 0; i < utilityCount; i++) { + startWorker(QUEUE_NAME_UTILITY); } } @@ -738,22 +794,31 @@ public class AsyncEventServiceImpl implements AsyncEventService { } - private void startWorker() { + private void startWorker(final String type) { + Preconditions.checkNotNull(type, "Worker type required"); synchronized (mutex) { + boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase()); + Observable<List<QueueMessage>> consumer = Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() { @Override public void call( final Subscriber<? super List<QueueMessage>> subscriber ) { //name our thread so it's easy to see - Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() ); + long threadNum = isUtilityQueue ? counterUtility.incrementAndGet() : counter.incrementAndGet(); + Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum ); - List<QueueMessage> drainList = null; + List<QueueMessage> drainList = null; do { try { - drainList = take(); + if ( isUtilityQueue ){ + drainList = takeFromUtilityQueue(); + }else{ + drainList = take(); + + } //emit our list in it's entity to hand off to a worker pool subscriber.onNext(drainList); @@ -808,7 +873,12 @@ public class AsyncEventServiceImpl implements AsyncEventService { // ack each message if making it to this point if( messagesToAck.size() > 0 ){ - ack( messagesToAck ); + + if ( isUtilityQueue ){ + ackUtilityQueue( messagesToAck ); + }else{ + ack( messagesToAck ); + } } return messagesToAck; @@ -861,7 +931,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { // collect into a list of QueueMessages that can be ack'd later .collect(Collectors.toList()); - queueIndexOperationMessage(combined); + queueIndexOperationMessage(combined, false); return queueMessages; } @@ -871,10 +941,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince); - queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null)); + queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false); } - public void indexBatch(final List<EdgeScope> edges, final long updatedSince) { + public void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue) { final List<EntityIndexEvent> batch = new ArrayList<>(); edges.forEach(e -> { @@ -884,7 +954,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { }); - offerBatch( batch ); + offerBatch( batch, forUtilityQueue ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java index 1038408..45dff1c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java @@ -36,6 +36,8 @@ public interface IndexProcessorFig extends GuicyFig { String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count"; + String ELASTICSEARCH_WORKER_COUNT_UTILITY = "elasticsearch.worker_count_utility"; + String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor"; String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl"; @@ -82,6 +84,13 @@ public interface IndexProcessorFig extends GuicyFig { int getWorkerCount(); /** + * The number of worker threads used to read utility requests from the queue ( mostly re-index ). + */ + @Default("2") + @Key(ELASTICSEARCH_WORKER_COUNT_UTILITY) + int getWorkerCountUtility(); + + /** * Set the implementation to use for queuing. * Valid values: TEST, LOCAL, SQS, SNS * NOTE: SQS and SNS equate to the same implementation of Amazon queue services. http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java index 5e201fb..2b3573e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java @@ -39,9 +39,10 @@ public interface ReIndexAction { void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ); /** - * Index a batch list of entities. + * Index a batch list of entities. Goes to the utility queue. * @param edges * @param updatedSince + * @param forUtilityQueue */ - void indexBatch ( final List<EdgeScope> edges, final long updatedSince); + void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java index 19fbcfa..b292005 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -169,7 +169,7 @@ public class ReIndexServiceImpl implements ReIndexService { .buffer( indexProcessorFig.getReindexBufferSize()) .doOnNext( edgeScopes -> { logger.info("Sending batch of {} to be indexed.", edgeScopes.size()); - indexService.indexBatch(edgeScopes, modifiedSince); + indexService.indexBatch(edgeScopes, modifiedSince, true); count.addAndGet(edgeScopes.size() ); if( edgeScopes.size() > 0 ) { writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1)); @@ -178,7 +178,7 @@ public class ReIndexServiceImpl implements ReIndexService { .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() )) .subscribeOn( Schedulers.io() ).subscribe(); - + return new ReIndexStatus( jobId, Status.STARTED, 0, 0 ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java index 62f6548..83f4c8b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java @@ -258,7 +258,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single)) .filter(msg -> !msg.isEmpty()) .doOnNext(indexOperation -> { - asyncEventService.queueIndexOperationMessage(indexOperation); + asyncEventService.queueIndexOperationMessage(indexOperation, false); }); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java index dcda98f..1afb524 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java @@ -251,7 +251,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter< .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single)) .filter(msg -> !msg.isEmpty()) .doOnNext(indexOperation -> { - asyncEventService.queueIndexOperationMessage(indexOperation); + asyncEventService.queueIndexOperationMessage(indexOperation, false); }); }