move deletes to new delete queue -- read repair will fix attempts to access deleted entities and connections, so indexing and collection deletes can proceed more slowly than other types of changes
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/570e1ab4 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/570e1ab4 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/570e1ab4 Branch: refs/heads/collectionClearJob Commit: 570e1ab4e2d4d356756fedee6b7ed5a1bd2d8e93 Parents: 39ec4f2 Author: Mike Dunker <mdun...@google.com> Authored: Fri Aug 18 15:12:56 2017 -0700 Committer: Mike Dunker <mdun...@google.com> Committed: Fri Aug 18 15:12:56 2017 -0700 ---------------------------------------------------------------------- .../asyncevents/AsyncEventQueueType.java | 35 ++ .../asyncevents/AsyncEventService.java | 4 +- .../asyncevents/AsyncEventServiceImpl.java | 463 +++++++++---------- .../index/IndexProcessorFig.java | 18 + .../corepersistence/index/ReIndexAction.java | 5 +- .../index/ReIndexServiceImpl.java | 3 +- .../read/traverse/AbstractReadGraphFilter.java | 11 +- .../AbstractReadReverseGraphFilter.java | 11 +- 8 files changed, 292 insertions(+), 258 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java new file mode 100644 index 0000000..4b91e17 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents; + +public enum AsyncEventQueueType { + REGULAR ("regular"), UTILITY("utility"), DELETE("delete"); + + private String displayName; + AsyncEventQueueType(String displayName) { + this.displayName = displayName; + } + + @Override + public String toString() { + return displayName; + } +} + http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index cab4e3e..9e346cf 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -84,9 +84,9 @@ public interface AsyncEventService extends ReIndexAction { /** * * @param indexOperationMessage - * @param forUtilityQueue + * @param queueType */ - void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue); + void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, AsyncEventQueueType queueType); /** * @param applicationScope http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 9501ad3..0e55e9b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -59,6 +59,7 @@ import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.persistence.queue.*; import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl; +import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; @@ -104,13 +105,16 @@ public class AsyncEventServiceImpl implements AsyncEventService { public int MAX_TAKE = 10; public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars + public static final String QUEUE_NAME_DELETE = "delete"; public static final String DEAD_LETTER_SUFFIX = "_dead"; private final LegacyQueueManager indexQueue; private final LegacyQueueManager utilityQueue; + private final LegacyQueueManager deleteQueue; private final LegacyQueueManager indexQueueDead; private final LegacyQueueManager utilityQueueDead; + private final LegacyQueueManager deleteQueueDead; private final IndexProcessorFig indexProcessorFig; private final LegacyQueueFig queueFig; private final IndexProducer indexProducer; @@ -132,8 +136,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { private final Counter indexErrorCounter; private final AtomicLong counter = new AtomicLong(); private final AtomicLong counterUtility = new AtomicLong(); + private final AtomicLong counterDelete = new AtomicLong(); private final AtomicLong counterIndexDead = new AtomicLong(); private final AtomicLong counterUtilityDead = new AtomicLong(); + private final AtomicLong counterDeleteDead = new AtomicLong(); private final AtomicLong inFlight = new AtomicLong(); private final Histogram messageCycle; private final MapManager esMapPersistence; @@ -174,16 +180,24 @@ public class AsyncEventServiceImpl implements AsyncEventService { LegacyQueueScope utilityQueueScope = new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL); + LegacyQueueScope deleteQueueScope = + new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL); + LegacyQueueScope indexQueueDeadScope = new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL, true); LegacyQueueScope utilityQueueDeadScope = new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL, true); + LegacyQueueScope deleteQueueDeadScope = + new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL, true); + this.indexQueue = queueManagerFactory.getQueueManager(indexQueueScope); this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope); + this.deleteQueue = queueManagerFactory.getQueueManager(deleteQueueScope); this.indexQueueDead = queueManagerFactory.getQueueManager(indexQueueDeadScope); this.utilityQueueDead = queueManagerFactory.getQueueManager(utilityQueueDeadScope); + this.deleteQueueDead = queueManagerFactory.getQueueManager(deleteQueueDeadScope); this.indexProcessorFig = indexProcessorFig; this.queueFig = queueFig; @@ -206,34 +220,90 @@ public class AsyncEventServiceImpl implements AsyncEventService { start(); } + private String getQueueName(AsyncEventQueueType queueType) { + switch (queueType) { + case REGULAR: + return QUEUE_NAME; + + case UTILITY: + return QUEUE_NAME_UTILITY; + + case DELETE: + return QUEUE_NAME_DELETE; + + default: + throw new IllegalArgumentException("Invalid queue type: " + queueType.toString()); + } + } + + private LegacyQueueManager getQueue(AsyncEventQueueType queueType) { + return getQueue(queueType, false); + } + + private LegacyQueueManager getQueue(AsyncEventQueueType queueType, boolean isDeadQueue) { + switch (queueType) { + case REGULAR: + return isDeadQueue ? indexQueueDead : indexQueue; + + case UTILITY: + return isDeadQueue ? utilityQueueDead : utilityQueue; + + case DELETE: + return isDeadQueue ? deleteQueueDead : deleteQueue; + + default: + throw new IllegalArgumentException("Invalid queue type: " + queueType.toString()); + } + } + + private AtomicLong getCounter(AsyncEventQueueType queueType, boolean isDeadQueue) { + switch (queueType) { + case REGULAR: + return isDeadQueue ? counterIndexDead : counter; + + case UTILITY: + return isDeadQueue ? counterUtilityDead : counterUtility; + + case DELETE: + return isDeadQueue ? counterDeleteDead : counterDelete; + + default: + throw new IllegalArgumentException("Invalid queue type: " + queueType.toString()); + } + } + + + + /** * Offer the EntityIdScope to SQS */ private void offer(final Serializable operation) { + offer(operation, AsyncEventQueueType.REGULAR); + } + + private void offer(final Serializable operation, AsyncEventQueueType queueType) { final Timer.Context timer = this.writeTimer.time(); try { //signal to SQS - this.indexQueue.sendMessageToLocalRegion( operation ); + getQueue(queueType).sendMessageToLocalRegion(operation); } catch (IOException e) { throw new RuntimeException("Unable to queue message", e); } finally { timer.stop(); } + } - private void offerTopic(final Serializable operation, boolean forUtilityQueue) { + private void offerTopic(final Serializable operation, AsyncEventQueueType queueType) { final Timer.Context timer = this.writeTimer.time(); try { //signal to SQS - if (forUtilityQueue) { - this.utilityQueue.sendMessageToAllRegions(operation); - } else { - this.indexQueue.sendMessageToAllRegions(operation); - } + getQueue(queueType).sendMessageToAllRegions(operation); } catch ( IOException e ) { throw new RuntimeException( "Unable to queue message", e ); @@ -244,15 +314,11 @@ public class AsyncEventServiceImpl implements AsyncEventService { } - private void offerBatch(final List operations, boolean forUtilityQueue){ + private void offerBatch(final List operations, AsyncEventQueueType queueType){ final Timer.Context timer = this.writeTimer.time(); try { //signal to SQS - if( forUtilityQueue ){ - this.utilityQueue.sendMessages(operations); - }else{ - this.indexQueue.sendMessages(operations); - } + getQueue(queueType).sendMessages(operations); } catch (IOException e) { throw new RuntimeException("Unable to queue message", e); } finally { @@ -260,25 +326,16 @@ public class AsyncEventServiceImpl implements AsyncEventService { } } - private void offerBatchToUtilityQueue(final List operations){ - try { - //signal to SQS - this.utilityQueue.sendMessages(operations); - } catch (IOException e) { - throw new RuntimeException("Unable to queue message", e); - } - } - /** * Take message */ - private List<LegacyQueueMessage> take() { + private List<LegacyQueueMessage> take(AsyncEventQueueType queueType, boolean isDeadQueue) { final Timer.Context timer = this.readTimer.time(); try { - return indexQueue.getMessages(MAX_TAKE, AsyncEvent.class); + return getQueue(queueType, isDeadQueue).getMessages(MAX_TAKE, AsyncEvent.class); } finally { //stop our timer @@ -286,52 +343,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { } } - /** - * Take message from SQS utility queue - */ - private List<LegacyQueueMessage> takeFromUtilityQueue() { - - final Timer.Context timer = this.readTimer.time(); - - try { - return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class); - } - finally { - //stop our timer - timer.stop(); - } - } - - /** - * Take message from index dead letter queue - */ - private List<LegacyQueueMessage> takeFromIndexDeadQueue() { - - final Timer.Context timer = this.readTimer.time(); - - try { - return indexQueueDead.getMessages(MAX_TAKE, AsyncEvent.class); - } - finally { - //stop our timer - timer.stop(); - } - } - - /** - * Take message from SQS utility dead letter queue - */ - private List<LegacyQueueMessage> takeFromUtilityDeadQueue() { - - final Timer.Context timer = this.readTimer.time(); - - try { - return utilityQueueDead.getMessages(MAX_TAKE, AsyncEvent.class); - } - finally { - //stop our timer - timer.stop(); - } + private List<LegacyQueueMessage> take(AsyncEventQueueType queueType) { + return take(queueType, false); } @@ -362,38 +375,15 @@ public class AsyncEventServiceImpl implements AsyncEventService { } } - - /** - * calls the event handlers and returns a result with information on whether - * it needs to be ack'd and whether it needs to be indexed - * Ack message in SQS - */ - public void ackUtilityQueue(final List<LegacyQueueMessage> messages) { - try{ - utilityQueue.commitMessages( messages ); - }catch(Exception e){ - throw new RuntimeException("Unable to ack messages", e); + public void ack(final List<LegacyQueueMessage> messages, AsyncEventQueueType queueType, boolean isDeadQueue) { + if (queueType == AsyncEventQueueType.REGULAR && !isDeadQueue) { + // different functionality + ack(messages); } - } - - /** - * ack messages in index dead letter queue - */ - public void ackIndexDeadQueue(final List<LegacyQueueMessage> messages) { - try{ - indexQueueDead.commitMessages( messages ); - }catch(Exception e){ - throw new RuntimeException("Unable to ack messages", e); + try { + getQueue(queueType, isDeadQueue).commitMessages( messages ); } - } - - /** - * ack messages in utility dead letter queue - */ - public void ackUtilityDeadQueue(final List<LegacyQueueMessage> messages) { - try{ - utilityQueueDead.commitMessages( messages ); - }catch(Exception e){ + catch (Exception e) { throw new RuntimeException("Unable to ack messages", e); } } @@ -536,7 +526,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType()); offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(), - new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), false); + new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), AsyncEventQueueType.REGULAR); } @@ -626,7 +616,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType()); // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op - offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) ); + offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ), AsyncEventQueueType.DELETE ); } private IndexOperationMessage handleEdgeDelete(final LegacyQueueMessage message) { @@ -660,9 +650,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { /** * Queue up an indexOperationMessage for multi region execution * @param indexOperationMessage - * @param forUtilityQueue + * @param queueType */ - public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue) { + public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, AsyncEventQueueType queueType) { // don't try to produce something with nothing if(indexOperationMessage == null || indexOperationMessage.isEmpty()){ @@ -688,7 +678,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId ); - offerTopic( elasticsearchIndexEvent, forUtilityQueue ); + offerTopic( elasticsearchIndexEvent, queueType ); } private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) @@ -764,7 +754,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType()); offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(), - new EntityIdScope( applicationScope, entityId), markedVersion), false); + new EntityIdScope( applicationScope, entityId), markedVersion), AsyncEventQueueType.DELETE ); } @@ -824,7 +814,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType()); // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op - offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) ); + offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ), + AsyncEventQueueType.DELETE ); } private IndexOperationMessage handleEntityDelete(final LegacyQueueMessage message) { @@ -879,25 +870,39 @@ public class AsyncEventServiceImpl implements AsyncEventService { public void start() { final int indexCount = indexProcessorFig.getWorkerCount(); final int utilityCount = indexProcessorFig.getWorkerCountUtility(); + final int deleteCount = indexProcessorFig.getWorkerCountDelete(); final int indexDeadCount = indexProcessorFig.getWorkerCountDeadLetter(); final int utilityDeadCount = indexProcessorFig.getWorkerCountUtilityDeadLetter(); - logger.info("Starting queue workers for indexing: index={} indexDLQ={} utility={} utilityDLQ={}", indexCount, indexDeadCount, - utilityCount, utilityDeadCount); + final int deleteDeadCount = indexProcessorFig.getWorkerCountDeleteDeadLetter(); + logger.info("Starting queue workers for indexing: index={} indexDLQ={} utility={} utilityDLQ={} delete={} deleteDLQ={}", + indexCount, indexDeadCount, utilityCount, utilityDeadCount, deleteCount, deleteDeadCount); for (int i = 0; i < indexCount; i++) { - startWorker(QUEUE_NAME); + startWorker(AsyncEventQueueType.REGULAR); } for (int i = 0; i < utilityCount; i++) { - startWorker(QUEUE_NAME_UTILITY); + startWorker(AsyncEventQueueType.UTILITY); } - for (int i = 0; i < indexDeadCount; i++) { - startDeadQueueWorker(QUEUE_NAME); + for (int i = 0; i < deleteCount; i++) { + startWorker(AsyncEventQueueType.DELETE); } + if( indexQueue instanceof SNSQueueManagerImpl) { + logger.info("Queue manager implementation supports dead letters, start dead letter queue workers."); + for (int i = 0; i < indexDeadCount; i++) { + startDeadQueueWorker(AsyncEventQueueType.REGULAR); + } - for (int i = 0; i < utilityDeadCount; i++) { - startDeadQueueWorker(QUEUE_NAME_UTILITY); + for (int i = 0; i < utilityDeadCount; i++) { + startDeadQueueWorker(AsyncEventQueueType.UTILITY); + } + + for (int i = 0; i < deleteDeadCount; i++) { + startDeadQueueWorker(AsyncEventQueueType.DELETE); + } + }else{ + logger.info("Queue manager implementation does NOT support dead letters, NOT starting dead letter queue worker."); } } @@ -916,11 +921,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { } - private void startWorker(final String type) { - Preconditions.checkNotNull(type, "Worker type required"); + private void startWorker(final AsyncEventQueueType queueType) { synchronized (mutex) { - boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase()); + String type = getQueueName(queueType); Observable<List<LegacyQueueMessage>> consumer = Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() { @@ -928,20 +932,15 @@ public class AsyncEventServiceImpl implements AsyncEventService { public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) { //name our thread so it's easy to see - long threadNum = isUtilityQueue ? - counterUtility.incrementAndGet() : counter.incrementAndGet(); - Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum ); + long threadNum = getCounter(queueType, false).incrementAndGet(); + Thread.currentThread().setName( "QueueConsumer_" + type + "_" + threadNum ); List<LegacyQueueMessage> drainList = null; do { try { - if ( isUtilityQueue ){ - drainList = takeFromUtilityQueue(); - }else{ - drainList = take(); + drainList = take(queueType); - } //emit our list in it's entity to hand off to a worker pool subscriber.onNext(drainList); @@ -997,7 +996,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { // submit the processed messages to index producer List<LegacyQueueMessage> messagesToAck = - submitToIndex( indexEventResults, isUtilityQueue ); + submitToIndex( indexEventResults, queueType ); if ( messagesToAck.size() < messages.size() ) { logger.warn( @@ -1007,12 +1006,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { // ack each message if making it to this point if( messagesToAck.size() > 0 ){ - - if ( isUtilityQueue ){ - ackUtilityQueue( messagesToAck ); - }else{ - ack( messagesToAck ); - } + ack(messagesToAck, queueType, false); } return messagesToAck; @@ -1036,129 +1030,112 @@ public class AsyncEventServiceImpl implements AsyncEventService { } - private void startDeadQueueWorker(final String type) { - Preconditions.checkNotNull(type, "Worker type required"); + private void startDeadQueueWorker(final AsyncEventQueueType queueType) { + String type = getQueueName(queueType); synchronized (mutex) { - boolean isUtilityDeadQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase()); - Observable<List<LegacyQueueMessage>> consumer = - Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() { - @Override - public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) { + Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() { + @Override + public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) { - //name our thread so it's easy to see - long threadNum = isUtilityDeadQueue ? - counterUtilityDead.incrementAndGet() : counterIndexDead.incrementAndGet(); - Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type+ "_" + threadNum ); + //name our thread so it's easy to see + long threadNum = getCounter(queueType, true).incrementAndGet(); + Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type + "_" + threadNum ); - List<LegacyQueueMessage> drainList = null; + List<LegacyQueueMessage> drainList = null; - do { - try { - if ( isUtilityDeadQueue ){ - drainList = takeFromUtilityDeadQueue(); - }else{ - drainList = takeFromIndexDeadQueue(); - } - //emit our list in it's entity to hand off to a worker pool - subscriber.onNext(drainList); + do { + try { + drainList = take(queueType, true); - //take since we're in flight - inFlight.addAndGet( drainList.size() ); + //emit our list in it's entity to hand off to a worker pool + subscriber.onNext(drainList); - } catch ( Throwable t ) { + //take since we're in flight + inFlight.addAndGet( drainList.size() ); - final long sleepTime = indexProcessorFig.getDeadLetterFailureRetryTime(); + } catch ( Throwable t ) { - // there might be an error here during tests, just clean the cache - indexQueueDead.clearQueueNameCache(); + final long sleepTime = indexProcessorFig.getDeadLetterFailureRetryTime(); - if ( t instanceof InvalidQueryException ) { + // there might be an error here during tests, just clean the cache + indexQueueDead.clearQueueNameCache(); - // don't fill up log with exceptions when keyspace and column - // families are not ready during bootstrap/setup - logger.warn( "Failed to dequeue dead letters due to '{}'. Sleeping for {} ms", - t.getMessage(), sleepTime ); + if ( t instanceof InvalidQueryException ) { - } else { - logger.error( "Failed to dequeue dead letters. Sleeping for {} ms", sleepTime, t); - } + // don't fill up log with exceptions when keyspace and column + // families are not ready during bootstrap/setup + logger.warn( "Failed to dequeue dead letters due to '{}'. Sleeping for {} ms", + t.getMessage(), sleepTime ); - if ( drainList != null ) { - inFlight.addAndGet( -1 * drainList.size() ); - } + } else { + logger.error( "Failed to dequeue dead letters. Sleeping for {} ms", sleepTime, t); + } - try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {} + if ( drainList != null ) { + inFlight.addAndGet( -1 * drainList.size() ); } + + try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {} } - while ( true ); } - } ) //this won't block our read loop, just reads and proceeds - .flatMap( sqsMessages -> { - - //do this on a different schedule, and introduce concurrency - // with flatmap for faster processing - return Observable.just( sqsMessages ) - - .map( messages -> { - if ( messages == null || messages.size() == 0 ) { - // no messages came from the queue, move on - return null; - } - - try { - // put the dead letter messages back in the appropriate queue - LegacyQueueManager returnQueue = null; - String queueType; - if (isUtilityDeadQueue) { - returnQueue = utilityQueue; - queueType = "utility"; - } else { - returnQueue = indexQueue; - queueType = "index"; - } - List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages); - for (LegacyQueueMessage msg : successMessages) { - logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType, msg.getType(), msg.getMessageId(), msg.getStringBody()); - } - int unsuccessfulMessagesSize = messages.size() - successMessages.size(); - if (unsuccessfulMessagesSize > 0) { - // some messages couldn't be sent to originating queue, log - Set<String> successMessageIds = new HashSet<>(); - for (LegacyQueueMessage msg : successMessages) { - String messageId = msg.getMessageId(); - if (successMessageIds.contains(messageId)) { - logger.warn("Found duplicate messageId in returned messages: {}", messageId); - } else { - successMessageIds.add(messageId); - } - } - for (LegacyQueueMessage msg : messages) { - String messageId = msg.getMessageId(); - if (!successMessageIds.contains(messageId)) { - logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType, msg.getType(), messageId, msg.getStringBody()); - } - } - } - - if (isUtilityDeadQueue) { - ackUtilityDeadQueue(successMessages); - } else { - ackIndexDeadQueue(successMessages); - } - - return messages; - } - catch ( Exception e ) { - logger.error( "Failed to ack messages", e ); - return null; - //do not rethrow so we can process all of them - } - } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); - - //end flatMap - }, indexProcessorFig.getEventConcurrencyFactor() ); + while ( true ); + } + } ) //this won't block our read loop, just reads and proceeds + .flatMap( sqsMessages -> { + + //do this on a different schedule, and introduce concurrency + // with flatmap for faster processing + return Observable.just( sqsMessages ) + + .map( messages -> { + if ( messages == null || messages.size() == 0 ) { + // no messages came from the queue, move on + return null; + } + + try { + // put the dead letter messages back in the appropriate queue + LegacyQueueManager returnQueue = getQueue(queueType, false); + + List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages); + for (LegacyQueueMessage msg : successMessages) { + logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType.toString(), msg.getType(), msg.getMessageId(), msg.getStringBody()); + } + int unsuccessfulMessagesSize = messages.size() - successMessages.size(); + if (unsuccessfulMessagesSize > 0) { + // some messages couldn't be sent to originating queue, log + Set<String> successMessageIds = new HashSet<>(); + for (LegacyQueueMessage msg : successMessages) { + String messageId = msg.getMessageId(); + if (successMessageIds.contains(messageId)) { + logger.warn("Found duplicate messageId in returned messages: {}", messageId); + } else { + successMessageIds.add(messageId); + } + } + for (LegacyQueueMessage msg : messages) { + String messageId = msg.getMessageId(); + if (!successMessageIds.contains(messageId)) { + logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType.toString(), msg.getType(), messageId, msg.getStringBody()); + } + } + } + + ack(successMessages, queueType, true); + + return messages; + } + catch ( Exception e ) { + logger.error( "Failed to ack messages", e ); + return null; + //do not rethrow so we can process all of them + } + } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); + + //end flatMap + }, indexProcessorFig.getEventConcurrencyFactor() ); //start in the background @@ -1172,7 +1149,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { * Submit results to index and return the queue messages to be ack'd * */ - private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) { + private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, AsyncEventQueueType queueType) { // if nothing came back then return empty list if(indexEventResults==null){ @@ -1199,7 +1176,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { // collect into a list of QueueMessages that can be ack'd later .collect(Collectors.toList()); - queueIndexOperationMessage(combined, forUtilityQueue); + queueIndexOperationMessage(combined, queueType); return queueMessages; } @@ -1210,10 +1187,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { new EntityIndexOperation( applicationScope, id, updatedSince); queueIndexOperationMessage( - eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false); + eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), AsyncEventQueueType.REGULAR ); } - public void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue) { + public void indexBatch(final List<EdgeScope> edges, final long updatedSince, AsyncEventQueueType queueType) { final List<EntityIndexEvent> batch = new ArrayList<>(); edges.forEach(e -> { @@ -1226,7 +1203,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size()); - offerBatch( batch, forUtilityQueue ); + offerBatch( batch, queueType ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java index 7eecf04..eb63056 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java @@ -40,10 +40,14 @@ public interface IndexProcessorFig extends GuicyFig { String ELASTICSEARCH_WORKER_COUNT_UTILITY = "elasticsearch.worker_count_utility"; + String ELASTICSEARCH_WORKER_COUNT_DELETE = "elasticsearch.worker_count_delete"; + String ELASTICSEARCH_WORKER_COUNT_DEADLETTER = "elasticsearch.worker_count_deadletter"; String ELASTICSEARCH_WORKER_COUNT_UTILITY_DEADLETTER = "elasticsearch.worker_count_utility_deadletter"; + String ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER = "elasticsearch.worker_count_delete_deadletter"; + String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor"; String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl"; @@ -105,6 +109,13 @@ public interface IndexProcessorFig extends GuicyFig { int getWorkerCountUtility(); /** + * The number of worker threads used to read delete requests from the queue. + */ + @Default("1") + @Key(ELASTICSEARCH_WORKER_COUNT_DELETE) + int getWorkerCountDelete(); + + /** * The number of worker threads used to read dead messages from the index dead letter queue and reload them into the index queue. */ @Default("1") @@ -119,6 +130,13 @@ public interface IndexProcessorFig extends GuicyFig { int getWorkerCountUtilityDeadLetter(); /** + * The number of worker threads used to read dead messages from the delete dead letter queue and reload them into the delete queue. + */ + @Default("1") + @Key(ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER) + int getWorkerCountDeleteDeadLetter(); + + /** * Set the implementation to use for queuing. * Valid values: TEST, LOCAL, SQS, SNS * NOTE: SQS and SNS equate to the same implementation of Amazon queue services. http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java index 2b3573e..d6bdd93 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java @@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence.index; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType; import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Id; @@ -42,7 +43,7 @@ public interface ReIndexAction { * Index a batch list of entities. Goes to the utility queue. * @param edges * @param updatedSince - * @param forUtilityQueue + * @param queueType */ - void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue); + void indexBatch(final List<EdgeScope> edges, final long updatedSince, AsyncEventQueueType queueType); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java index 0660d5e..c7371b3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,7 +170,7 @@ public class ReIndexServiceImpl implements ReIndexService { .buffer( indexProcessorFig.getReindexBufferSize()) .doOnNext( edgeScopes -> { logger.info("Sending batch of {} to be indexed.", edgeScopes.size()); - indexService.indexBatch(edgeScopes, modifiedSince, true); + indexService.indexBatch(edgeScopes, modifiedSince, AsyncEventQueueType.UTILITY); count.addAndGet(edgeScopes.size() ); if( edgeScopes.size() > 0 ) { writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1)); http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java index 83f4c8b..4886b08 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java @@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl; @@ -124,7 +125,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge); indexMessageObservable - .compose(applyCollector()) + .compose(applyCollector(AsyncEventQueueType.DELETE)) .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) .subscribe(); @@ -139,7 +140,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId); entityDeleteResults.getIndexObservable() - .compose(applyCollector()) + .compose(applyCollector(AsyncEventQueueType.DELETE)) .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) .subscribe(); @@ -159,7 +160,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId); entityDeleteResults.getIndexObservable() - .compose(applyCollector()) + .compose(applyCollector(AsyncEventQueueType.DELETE)) .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) .subscribe(); @@ -252,13 +253,13 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, } } - private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() { + private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) { return observable -> observable .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single)) .filter(msg -> !msg.isEmpty()) .doOnNext(indexOperation -> { - asyncEventService.queueIndexOperationMessage(indexOperation, false); + asyncEventService.queueIndexOperationMessage(indexOperation, queueType); }); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java index 1afb524..e8a20dd 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java @@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl; @@ -124,7 +125,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter< final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge); indexMessageObservable - .compose(applyCollector()) + .compose(applyCollector(AsyncEventQueueType.DELETE)) .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) .subscribe(); @@ -139,7 +140,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter< entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId); entityDeleteResults.getIndexObservable() - .compose(applyCollector()) + .compose(applyCollector(AsyncEventQueueType.DELETE)) .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) .subscribe(); @@ -159,7 +160,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter< entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId); entityDeleteResults.getIndexObservable() - .compose(applyCollector()) + .compose(applyCollector(AsyncEventQueueType.DELETE)) .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) .subscribe(); @@ -245,13 +246,13 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter< } } - private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() { + private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) { return observable -> observable .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single)) .filter(msg -> !msg.isEmpty()) .doOnNext(indexOperation -> { - asyncEventService.queueIndexOperationMessage(indexOperation, false); + asyncEventService.queueIndexOperationMessage(indexOperation, queueType); }); }