move deletes to new delete queue -- read repair will fix attempts to access 
deleted entities and connections, so indexing and collection deletes can 
proceed more slowly than other types of changes


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/570e1ab4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/570e1ab4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/570e1ab4

Branch: refs/heads/collectionClearJob
Commit: 570e1ab4e2d4d356756fedee6b7ed5a1bd2d8e93
Parents: 39ec4f2
Author: Mike Dunker <mdun...@google.com>
Authored: Fri Aug 18 15:12:56 2017 -0700
Committer: Mike Dunker <mdun...@google.com>
Committed: Fri Aug 18 15:12:56 2017 -0700

----------------------------------------------------------------------
 .../asyncevents/AsyncEventQueueType.java        |  35 ++
 .../asyncevents/AsyncEventService.java          |   4 +-
 .../asyncevents/AsyncEventServiceImpl.java      | 463 +++++++++----------
 .../index/IndexProcessorFig.java                |  18 +
 .../corepersistence/index/ReIndexAction.java    |   5 +-
 .../index/ReIndexServiceImpl.java               |   3 +-
 .../read/traverse/AbstractReadGraphFilter.java  |  11 +-
 .../AbstractReadReverseGraphFilter.java         |  11 +-
 8 files changed, 292 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
new file mode 100644
index 0000000..4b91e17
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+public enum AsyncEventQueueType {
+    REGULAR ("regular"), UTILITY("utility"), DELETE("delete");
+
+    private String displayName;
+    AsyncEventQueueType(String displayName) {
+        this.displayName = displayName;
+    }
+
+    @Override
+    public String toString() {
+        return displayName;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index cab4e3e..9e346cf 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -84,9 +84,9 @@ public interface AsyncEventService extends ReIndexAction {
     /**
      *
      * @param indexOperationMessage
-     * @param forUtilityQueue
+     * @param queueType
      */
-    void queueIndexOperationMessage(final IndexOperationMessage 
indexOperationMessage, boolean forUtilityQueue);
+    void queueIndexOperationMessage(final IndexOperationMessage 
indexOperationMessage, AsyncEventQueueType queueType);
 
     /**
      * @param applicationScope

http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 9501ad3..0e55e9b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -59,6 +59,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.queue.*;
 import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
+import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
@@ -104,13 +105,16 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
     public int MAX_TAKE = 10;
     public static final String QUEUE_NAME = "index"; //keep this short as AWS 
limits queue name size to 80 chars
     public static final String QUEUE_NAME_UTILITY = "utility"; //keep this 
short as AWS limits queue name size to 80 chars
+    public static final String QUEUE_NAME_DELETE = "delete";
     public static final String DEAD_LETTER_SUFFIX = "_dead";
 
 
     private final LegacyQueueManager indexQueue;
     private final LegacyQueueManager utilityQueue;
+    private final LegacyQueueManager deleteQueue;
     private final LegacyQueueManager indexQueueDead;
     private final LegacyQueueManager utilityQueueDead;
+    private final LegacyQueueManager deleteQueueDead;
     private final IndexProcessorFig indexProcessorFig;
     private final LegacyQueueFig queueFig;
     private final IndexProducer indexProducer;
@@ -132,8 +136,10 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
     private final Counter indexErrorCounter;
     private final AtomicLong counter = new AtomicLong();
     private final AtomicLong counterUtility = new AtomicLong();
+    private final AtomicLong counterDelete = new AtomicLong();
     private final AtomicLong counterIndexDead = new AtomicLong();
     private final AtomicLong counterUtilityDead = new AtomicLong();
+    private final AtomicLong counterDeleteDead = new AtomicLong();
     private final AtomicLong inFlight = new AtomicLong();
     private final Histogram messageCycle;
     private final MapManager esMapPersistence;
@@ -174,16 +180,24 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         LegacyQueueScope utilityQueueScope =
             new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, 
LegacyQueueScope.RegionImplementation.ALL);
 
+        LegacyQueueScope deleteQueueScope =
+            new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, 
LegacyQueueScope.RegionImplementation.ALL);
+
         LegacyQueueScope indexQueueDeadScope =
             new LegacyQueueScopeImpl(QUEUE_NAME, 
LegacyQueueScope.RegionImplementation.ALL, true);
 
         LegacyQueueScope utilityQueueDeadScope =
             new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, 
LegacyQueueScope.RegionImplementation.ALL, true);
 
+        LegacyQueueScope deleteQueueDeadScope =
+            new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, 
LegacyQueueScope.RegionImplementation.ALL, true);
+
         this.indexQueue = queueManagerFactory.getQueueManager(indexQueueScope);
         this.utilityQueue = 
queueManagerFactory.getQueueManager(utilityQueueScope);
+        this.deleteQueue = 
queueManagerFactory.getQueueManager(deleteQueueScope);
         this.indexQueueDead = 
queueManagerFactory.getQueueManager(indexQueueDeadScope);
         this.utilityQueueDead = 
queueManagerFactory.getQueueManager(utilityQueueDeadScope);
+        this.deleteQueueDead = 
queueManagerFactory.getQueueManager(deleteQueueDeadScope);
 
         this.indexProcessorFig = indexProcessorFig;
         this.queueFig = queueFig;
@@ -206,34 +220,90 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         start();
     }
 
+    private String getQueueName(AsyncEventQueueType queueType) {
+        switch (queueType) {
+            case REGULAR:
+                return QUEUE_NAME;
+
+            case UTILITY:
+                return QUEUE_NAME_UTILITY;
+
+            case DELETE:
+                return QUEUE_NAME_DELETE;
+
+            default:
+                throw new IllegalArgumentException("Invalid queue type: " + 
queueType.toString());
+        }
+    }
+
+    private LegacyQueueManager getQueue(AsyncEventQueueType queueType) {
+        return getQueue(queueType, false);
+    }
+
+    private LegacyQueueManager getQueue(AsyncEventQueueType queueType, boolean 
isDeadQueue) {
+        switch (queueType) {
+            case REGULAR:
+                return isDeadQueue ? indexQueueDead : indexQueue;
+
+            case UTILITY:
+                return isDeadQueue ? utilityQueueDead : utilityQueue;
+
+            case DELETE:
+                return isDeadQueue ? deleteQueueDead : deleteQueue;
+
+            default:
+                throw new IllegalArgumentException("Invalid queue type: " + 
queueType.toString());
+        }
+    }
+
+    private AtomicLong getCounter(AsyncEventQueueType queueType, boolean 
isDeadQueue) {
+        switch (queueType) {
+            case REGULAR:
+                return isDeadQueue ? counterIndexDead : counter;
+
+            case UTILITY:
+                return isDeadQueue ? counterUtilityDead : counterUtility;
+
+            case DELETE:
+                return isDeadQueue ? counterDeleteDead : counterDelete;
+
+            default:
+                throw new IllegalArgumentException("Invalid queue type: " + 
queueType.toString());
+        }
+    }
+
+
+
+
 
     /**
      * Offer the EntityIdScope to SQS
      */
     private void offer(final Serializable operation) {
+        offer(operation, AsyncEventQueueType.REGULAR);
+    }
+
+    private void offer(final Serializable operation, AsyncEventQueueType 
queueType) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
             //signal to SQS
-            this.indexQueue.sendMessageToLocalRegion( operation );
+            getQueue(queueType).sendMessageToLocalRegion(operation);
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
             timer.stop();
         }
+
     }
 
 
-    private void offerTopic(final Serializable operation, boolean 
forUtilityQueue) {
+    private void offerTopic(final Serializable operation, AsyncEventQueueType 
queueType) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
             //signal to SQS
-            if (forUtilityQueue) {
-                this.utilityQueue.sendMessageToAllRegions(operation);
-            } else {
-                this.indexQueue.sendMessageToAllRegions(operation);
-            }
+            getQueue(queueType).sendMessageToAllRegions(operation);
         }
         catch ( IOException e ) {
             throw new RuntimeException( "Unable to queue message", e );
@@ -244,15 +314,11 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
     }
 
 
-    private void offerBatch(final List operations, boolean forUtilityQueue){
+    private void offerBatch(final List operations, AsyncEventQueueType 
queueType){
         final Timer.Context timer = this.writeTimer.time();
         try {
             //signal to SQS
-            if( forUtilityQueue ){
-                this.utilityQueue.sendMessages(operations);
-            }else{
-                this.indexQueue.sendMessages(operations);
-            }
+            getQueue(queueType).sendMessages(operations);
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -260,25 +326,16 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         }
     }
 
-    private void offerBatchToUtilityQueue(final List operations){
-        try {
-            //signal to SQS
-            this.utilityQueue.sendMessages(operations);
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to queue message", e);
-        }
-    }
-
 
     /**
      * Take message
      */
-    private List<LegacyQueueMessage> take() {
+    private List<LegacyQueueMessage> take(AsyncEventQueueType queueType, 
boolean isDeadQueue) {
 
         final Timer.Context timer = this.readTimer.time();
 
         try {
-            return indexQueue.getMessages(MAX_TAKE, AsyncEvent.class);
+            return getQueue(queueType, isDeadQueue).getMessages(MAX_TAKE, 
AsyncEvent.class);
         }
         finally {
             //stop our timer
@@ -286,52 +343,8 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         }
     }
 
-    /**
-     * Take message from SQS utility queue
-     */
-    private List<LegacyQueueMessage> takeFromUtilityQueue() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class);
-        }
-        finally {
-            //stop our timer
-            timer.stop();
-        }
-    }
-
-    /**
-     * Take message from index dead letter queue
-     */
-    private List<LegacyQueueMessage> takeFromIndexDeadQueue() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return indexQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
-        }
-        finally {
-            //stop our timer
-            timer.stop();
-        }
-    }
-
-    /**
-     * Take message from SQS utility dead letter queue
-     */
-    private List<LegacyQueueMessage> takeFromUtilityDeadQueue() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return utilityQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
-        }
-        finally {
-            //stop our timer
-            timer.stop();
-        }
+    private List<LegacyQueueMessage> take(AsyncEventQueueType queueType) {
+        return take(queueType, false);
     }
 
 
@@ -362,38 +375,15 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         }
     }
 
-
-    /**
-     * calls the event handlers and returns a result with information on 
whether
-     * it needs to be ack'd and whether it needs to be indexed
-     * Ack message in SQS
-     */
-    public void ackUtilityQueue(final List<LegacyQueueMessage> messages) {
-        try{
-            utilityQueue.commitMessages( messages );
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
+    public void ack(final List<LegacyQueueMessage> messages, 
AsyncEventQueueType queueType, boolean isDeadQueue) {
+        if (queueType == AsyncEventQueueType.REGULAR && !isDeadQueue) {
+            // different functionality
+            ack(messages);
         }
-    }
-
-    /**
-     * ack messages in index dead letter queue
-     */
-    public void ackIndexDeadQueue(final List<LegacyQueueMessage> messages) {
-        try{
-            indexQueueDead.commitMessages( messages );
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
+        try {
+            getQueue(queueType, isDeadQueue).commitMessages( messages );
         }
-    }
-
-    /**
-     * ack messages in utility dead letter queue
-     */
-    public void ackUtilityDeadQueue(final List<LegacyQueueMessage> messages) {
-        try{
-            utilityQueueDead.commitMessages( messages );
-        }catch(Exception e){
+        catch (Exception e) {
             throw new RuntimeException("Unable to ack messages", e);
         }
     }
@@ -536,7 +526,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
             applicationScope.getApplication().getUuid(), 
applicationScope.getApplication().getType());
 
         offerTopic( new InitializeApplicationIndexEvent( 
queueFig.getPrimaryRegion(),
-            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), 
false);
+            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), 
AsyncEventQueueType.REGULAR);
     }
 
 
@@ -626,7 +616,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
             edge.getType(), edge.getTargetNode().getUuid(), 
edge.getTargetNode().getType());
 
         // sent in region (not offerTopic) as the delete IO happens in-region, 
then queues a multi-region de-index op
-        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), 
applicationScope, edge ) );
+        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), 
applicationScope, edge ), AsyncEventQueueType.DELETE );
     }
 
     private IndexOperationMessage  handleEdgeDelete(final LegacyQueueMessage 
message) {
@@ -660,9 +650,9 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
     /**
      * Queue up an indexOperationMessage for multi region execution
      * @param indexOperationMessage
-     * @param forUtilityQueue
+     * @param queueType
      */
-    public void queueIndexOperationMessage(final IndexOperationMessage 
indexOperationMessage, boolean forUtilityQueue) {
+    public void queueIndexOperationMessage(final IndexOperationMessage 
indexOperationMessage, AsyncEventQueueType queueType) {
 
         // don't try to produce something with nothing
         if(indexOperationMessage == null || indexOperationMessage.isEmpty()){
@@ -688,7 +678,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
         logger.trace("Offering ElasticsearchIndexEvent for message {}", 
newMessageId );
 
-        offerTopic( elasticsearchIndexEvent, forUtilityQueue );
+        offerTopic( elasticsearchIndexEvent, queueType );
     }
 
     private void handleIndexOperation(final ElasticsearchIndexEvent 
elasticsearchIndexEvent)
@@ -764,7 +754,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
             applicationScope.getApplication().getUuid(), entityId.getUuid(), 
entityId.getType());
 
         offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
-            new EntityIdScope( applicationScope, entityId), markedVersion), 
false);
+            new EntityIdScope( applicationScope, entityId), markedVersion), 
AsyncEventQueueType.DELETE );
 
     }
 
@@ -824,7 +814,8 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         logger.trace("Offering EntityDeleteEvent for {}:{}", 
entityId.getUuid(), entityId.getType());
 
         // sent in region (not offerTopic) as the delete IO happens in-region, 
then queues a multi-region de-index op
-        offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new 
EntityIdScope( applicationScope, entityId ) ) );
+        offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new 
EntityIdScope( applicationScope, entityId ) ),
+            AsyncEventQueueType.DELETE );
     }
 
     private IndexOperationMessage handleEntityDelete(final LegacyQueueMessage 
message) {
@@ -879,25 +870,39 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
     public void start() {
         final int indexCount = indexProcessorFig.getWorkerCount();
         final int utilityCount = indexProcessorFig.getWorkerCountUtility();
+        final int deleteCount = indexProcessorFig.getWorkerCountDelete();
         final int indexDeadCount = 
indexProcessorFig.getWorkerCountDeadLetter();
         final int utilityDeadCount = 
indexProcessorFig.getWorkerCountUtilityDeadLetter();
-        logger.info("Starting queue workers for indexing: index={} indexDLQ={} 
utility={} utilityDLQ={}", indexCount, indexDeadCount,
-            utilityCount, utilityDeadCount);
+        final int deleteDeadCount = 
indexProcessorFig.getWorkerCountDeleteDeadLetter();
+        logger.info("Starting queue workers for indexing: index={} indexDLQ={} 
utility={} utilityDLQ={} delete={} deleteDLQ={}",
+            indexCount, indexDeadCount, utilityCount, utilityDeadCount, 
deleteCount, deleteDeadCount);
 
         for (int i = 0; i < indexCount; i++) {
-            startWorker(QUEUE_NAME);
+            startWorker(AsyncEventQueueType.REGULAR);
         }
 
         for (int i = 0; i < utilityCount; i++) {
-            startWorker(QUEUE_NAME_UTILITY);
+            startWorker(AsyncEventQueueType.UTILITY);
         }
 
-        for (int i = 0; i < indexDeadCount; i++) {
-            startDeadQueueWorker(QUEUE_NAME);
+        for (int i = 0; i < deleteCount; i++) {
+            startWorker(AsyncEventQueueType.DELETE);
         }
+        if( indexQueue instanceof SNSQueueManagerImpl) {
+            logger.info("Queue manager implementation supports dead letters, 
start dead letter queue workers.");
+            for (int i = 0; i < indexDeadCount; i++) {
+                startDeadQueueWorker(AsyncEventQueueType.REGULAR);
+            }
 
-        for (int i = 0; i < utilityDeadCount; i++) {
-            startDeadQueueWorker(QUEUE_NAME_UTILITY);
+            for (int i = 0; i < utilityDeadCount; i++) {
+                startDeadQueueWorker(AsyncEventQueueType.UTILITY);
+            }
+
+            for (int i = 0; i < deleteDeadCount; i++) {
+                startDeadQueueWorker(AsyncEventQueueType.DELETE);
+            }
+        }else{
+            logger.info("Queue manager implementation does NOT support dead 
letters, NOT starting dead letter queue worker.");
         }
     }
 
@@ -916,11 +921,10 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
     }
 
 
-    private void startWorker(final String type) {
-        Preconditions.checkNotNull(type, "Worker type required");
+    private void startWorker(final AsyncEventQueueType queueType) {
         synchronized (mutex) {
 
-            boolean isUtilityQueue = isNotEmpty(type) && 
type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+            String type = getQueueName(queueType);
 
             Observable<List<LegacyQueueMessage>> consumer =
                 Observable.create( new 
Observable.OnSubscribe<List<LegacyQueueMessage>>() {
@@ -928,20 +932,15 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
                     public void call( final Subscriber<? super 
List<LegacyQueueMessage>> subscriber ) {
 
                         //name our thread so it's easy to see
-                        long threadNum = isUtilityQueue ?
-                            counterUtility.incrementAndGet() : 
counter.incrementAndGet();
-                        Thread.currentThread().setName( "QueueConsumer_" + 
type+ "_" + threadNum );
+                        long threadNum = getCounter(queueType, 
false).incrementAndGet();
+                        Thread.currentThread().setName( "QueueConsumer_" + 
type + "_" + threadNum );
 
                         List<LegacyQueueMessage> drainList = null;
 
                         do {
                             try {
-                                if ( isUtilityQueue ){
-                                    drainList = takeFromUtilityQueue();
-                                }else{
-                                    drainList = take();
+                                drainList = take(queueType);
 
-                                }
                                 //emit our list in it's entity to hand off to 
a worker pool
                                 subscriber.onNext(drainList);
 
@@ -997,7 +996,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
                                     // submit the processed messages to index 
producer
                                     List<LegacyQueueMessage> messagesToAck =
-                                        submitToIndex( indexEventResults, 
isUtilityQueue );
+                                        submitToIndex( indexEventResults, 
queueType );
 
                                     if ( messagesToAck.size() < 
messages.size() ) {
                                         logger.warn(
@@ -1007,12 +1006,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
                                     // ack each message if making it to this 
point
                                     if( messagesToAck.size() > 0 ){
-
-                                        if ( isUtilityQueue ){
-                                            ackUtilityQueue( messagesToAck );
-                                        }else{
-                                            ack( messagesToAck );
-                                        }
+                                        ack(messagesToAck, queueType, false);
                                     }
 
                                     return messagesToAck;
@@ -1036,129 +1030,112 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
     }
 
 
-    private void startDeadQueueWorker(final String type) {
-        Preconditions.checkNotNull(type, "Worker type required");
+    private void startDeadQueueWorker(final AsyncEventQueueType queueType) {
+        String type = getQueueName(queueType);
         synchronized (mutex) {
 
-            boolean isUtilityDeadQueue = isNotEmpty(type) && 
type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
-
             Observable<List<LegacyQueueMessage>> consumer =
-                    Observable.create( new 
Observable.OnSubscribe<List<LegacyQueueMessage>>() {
-                        @Override
-                        public void call( final Subscriber<? super 
List<LegacyQueueMessage>> subscriber ) {
+                Observable.create( new 
Observable.OnSubscribe<List<LegacyQueueMessage>>() {
+                    @Override
+                    public void call( final Subscriber<? super 
List<LegacyQueueMessage>> subscriber ) {
 
-                            //name our thread so it's easy to see
-                            long threadNum = isUtilityDeadQueue ?
-                                counterUtilityDead.incrementAndGet() : 
counterIndexDead.incrementAndGet();
-                            Thread.currentThread().setName( 
"QueueDeadLetterConsumer_" + type+ "_" + threadNum );
+                        //name our thread so it's easy to see
+                        long threadNum = getCounter(queueType, 
true).incrementAndGet();
+                        Thread.currentThread().setName( 
"QueueDeadLetterConsumer_" + type + "_" + threadNum );
 
-                            List<LegacyQueueMessage> drainList = null;
+                        List<LegacyQueueMessage> drainList = null;
 
-                            do {
-                                try {
-                                    if ( isUtilityDeadQueue ){
-                                        drainList = takeFromUtilityDeadQueue();
-                                    }else{
-                                        drainList = takeFromIndexDeadQueue();
-                                    }
-                                    //emit our list in it's entity to hand off 
to a worker pool
-                                    subscriber.onNext(drainList);
+                        do {
+                            try {
+                                drainList = take(queueType, true);
 
-                                    //take since  we're in flight
-                                    inFlight.addAndGet( drainList.size() );
+                                //emit our list in it's entity to hand off to 
a worker pool
+                                subscriber.onNext(drainList);
 
-                                } catch ( Throwable t ) {
+                                //take since  we're in flight
+                                inFlight.addAndGet( drainList.size() );
 
-                                    final long sleepTime = 
indexProcessorFig.getDeadLetterFailureRetryTime();
+                            } catch ( Throwable t ) {
 
-                                    // there might be an error here during 
tests, just clean the cache
-                                    indexQueueDead.clearQueueNameCache();
+                                final long sleepTime = 
indexProcessorFig.getDeadLetterFailureRetryTime();
 
-                                    if ( t instanceof InvalidQueryException ) {
+                                // there might be an error here during tests, 
just clean the cache
+                                indexQueueDead.clearQueueNameCache();
 
-                                        // don't fill up log with exceptions 
when keyspace and column
-                                        // families are not ready during 
bootstrap/setup
-                                        logger.warn( "Failed to dequeue dead 
letters due to '{}'. Sleeping for {} ms",
-                                            t.getMessage(), sleepTime );
+                                if ( t instanceof InvalidQueryException ) {
 
-                                    } else {
-                                        logger.error( "Failed to dequeue dead 
letters. Sleeping for {} ms", sleepTime, t);
-                                    }
+                                    // don't fill up log with exceptions when 
keyspace and column
+                                    // families are not ready during 
bootstrap/setup
+                                    logger.warn( "Failed to dequeue dead 
letters due to '{}'. Sleeping for {} ms",
+                                        t.getMessage(), sleepTime );
 
-                                    if ( drainList != null ) {
-                                        inFlight.addAndGet( -1 * 
drainList.size() );
-                                    }
+                                } else {
+                                    logger.error( "Failed to dequeue dead 
letters. Sleeping for {} ms", sleepTime, t);
+                                }
 
-                                    try { Thread.sleep( sleepTime ); } catch ( 
InterruptedException ie ) {}
+                                if ( drainList != null ) {
+                                    inFlight.addAndGet( -1 * drainList.size() 
);
                                 }
+
+                                try { Thread.sleep( sleepTime ); } catch ( 
InterruptedException ie ) {}
                             }
-                            while ( true );
                         }
-                    } )        //this won't block our read loop, just reads 
and proceeds
-                        .flatMap( sqsMessages -> {
-
-                            //do this on a different schedule, and introduce 
concurrency
-                            // with flatmap for faster processing
-                            return Observable.just( sqsMessages )
-
-                                             .map( messages -> {
-                                                 if ( messages == null || 
messages.size() == 0 ) {
-                                                     // no messages came from 
the queue, move on
-                                                     return null;
-                                                 }
-
-                                                 try {
-                                                     // put the dead letter 
messages back in the appropriate queue
-                                                     LegacyQueueManager 
returnQueue = null;
-                                                     String queueType;
-                                                     if (isUtilityDeadQueue) {
-                                                         returnQueue = 
utilityQueue;
-                                                         queueType = "utility";
-                                                     } else {
-                                                         returnQueue = 
indexQueue;
-                                                         queueType = "index";
-                                                     }
-                                                     List<LegacyQueueMessage> 
successMessages = returnQueue.sendQueueMessages(messages);
-                                                     for (LegacyQueueMessage 
msg : successMessages) {
-                                                         
logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", 
queueType, msg.getType(), msg.getMessageId(), msg.getStringBody());
-                                                     }
-                                                     int 
unsuccessfulMessagesSize = messages.size() - successMessages.size();
-                                                     if 
(unsuccessfulMessagesSize > 0) {
-                                                         // some messages 
couldn't be sent to originating queue, log
-                                                         Set<String> 
successMessageIds = new HashSet<>();
-                                                         for 
(LegacyQueueMessage msg : successMessages) {
-                                                             String messageId 
= msg.getMessageId();
-                                                             if 
(successMessageIds.contains(messageId)) {
-                                                                 
logger.warn("Found duplicate messageId in returned messages: {}", messageId);
-                                                             } else {
-                                                                 
successMessageIds.add(messageId);
-                                                             }
-                                                         }
-                                                         for 
(LegacyQueueMessage msg : messages) {
-                                                             String messageId 
= msg.getMessageId();
-                                                             if 
(!successMessageIds.contains(messageId)) {
-                                                                 
logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: 
{}", queueType, msg.getType(), messageId, msg.getStringBody());
-                                                             }
-                                                         }
-                                                     }
-
-                                                     if (isUtilityDeadQueue) {
-                                                         
ackUtilityDeadQueue(successMessages);
-                                                     } else {
-                                                         
ackIndexDeadQueue(successMessages);
-                                                     }
-
-                                                     return messages;
-                                                 }
-                                                 catch ( Exception e ) {
-                                                     logger.error( "Failed to 
ack messages", e );
-                                                     return null;
-                                                     //do not rethrow so we 
can process all of them
-                                                 }
-                                             } ).subscribeOn( 
rxTaskScheduler.getAsyncIOScheduler() );
-
-                            //end flatMap
-                        }, indexProcessorFig.getEventConcurrencyFactor() );
+                        while ( true );
+                    }
+                } )        //this won't block our read loop, just reads and 
proceeds
+                    .flatMap( sqsMessages -> {
+
+                        //do this on a different schedule, and introduce 
concurrency
+                        // with flatmap for faster processing
+                        return Observable.just( sqsMessages )
+
+                            .map( messages -> {
+                                if ( messages == null || messages.size() == 0 
) {
+                                    // no messages came from the queue, move on
+                                    return null;
+                                }
+
+                                try {
+                                    // put the dead letter messages back in 
the appropriate queue
+                                    LegacyQueueManager returnQueue = 
getQueue(queueType, false);
+
+                                    List<LegacyQueueMessage> successMessages = 
returnQueue.sendQueueMessages(messages);
+                                    for (LegacyQueueMessage msg : 
successMessages) {
+                                        logger.warn("Returning message to {} 
queue: type:{}, messageId:{} body: {}", queueType.toString(), msg.getType(), 
msg.getMessageId(), msg.getStringBody());
+                                    }
+                                    int unsuccessfulMessagesSize = 
messages.size() - successMessages.size();
+                                    if (unsuccessfulMessagesSize > 0) {
+                                        // some messages couldn't be sent to 
originating queue, log
+                                        Set<String> successMessageIds = new 
HashSet<>();
+                                        for (LegacyQueueMessage msg : 
successMessages) {
+                                            String messageId = 
msg.getMessageId();
+                                            if 
(successMessageIds.contains(messageId)) {
+                                                logger.warn("Found duplicate 
messageId in returned messages: {}", messageId);
+                                            } else {
+                                                
successMessageIds.add(messageId);
+                                            }
+                                        }
+                                        for (LegacyQueueMessage msg : 
messages) {
+                                            String messageId = 
msg.getMessageId();
+                                            if 
(!successMessageIds.contains(messageId)) {
+                                                logger.warn("Failed to return 
message to {} queue: type:{} messageId:{} body: {}", queueType.toString(), 
msg.getType(), messageId, msg.getStringBody());
+                                            }
+                                        }
+                                    }
+
+                                    ack(successMessages, queueType, true);
+
+                                    return messages;
+                                }
+                                catch ( Exception e ) {
+                                    logger.error( "Failed to ack messages", e 
);
+                                    return null;
+                                    //do not rethrow so we can process all of 
them
+                                }
+                            } ).subscribeOn( 
rxTaskScheduler.getAsyncIOScheduler() );
+
+                        //end flatMap
+                    }, indexProcessorFig.getEventConcurrencyFactor() );
 
             //start in the background
 
@@ -1172,7 +1149,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
      * Submit results to index and return the queue messages to be ack'd
      *
      */
-    private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> 
indexEventResults, boolean forUtilityQueue) {
+    private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> 
indexEventResults, AsyncEventQueueType queueType) {
 
         // if nothing came back then return empty list
         if(indexEventResults==null){
@@ -1199,7 +1176,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
             // collect into a list of QueueMessages that can be ack'd later
             .collect(Collectors.toList());
 
-       queueIndexOperationMessage(combined, forUtilityQueue);
+        queueIndexOperationMessage(combined, queueType);
 
         return queueMessages;
     }
@@ -1210,10 +1187,10 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
             new EntityIndexOperation( applicationScope, id, updatedSince);
 
         queueIndexOperationMessage(
-            eventBuilder.buildEntityIndex( entityIndexOperation 
).toBlocking().lastOrDefault(null), false);
+            eventBuilder.buildEntityIndex( entityIndexOperation 
).toBlocking().lastOrDefault(null), AsyncEventQueueType.REGULAR );
     }
 
-    public void indexBatch(final List<EdgeScope> edges, final long 
updatedSince, boolean forUtilityQueue) {
+    public void indexBatch(final List<EdgeScope> edges, final long 
updatedSince, AsyncEventQueueType queueType) {
 
         final List<EntityIndexEvent> batch = new ArrayList<>();
         edges.forEach(e -> {
@@ -1226,7 +1203,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
         logger.trace("Offering batch of EntityIndexEvent of size {}", 
batch.size());
 
-        offerBatch( batch, forUtilityQueue );
+        offerBatch( batch, queueType );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7eecf04..eb63056 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -40,10 +40,14 @@ public interface IndexProcessorFig extends GuicyFig {
 
     String ELASTICSEARCH_WORKER_COUNT_UTILITY = 
"elasticsearch.worker_count_utility";
 
+    String ELASTICSEARCH_WORKER_COUNT_DELETE = 
"elasticsearch.worker_count_delete";
+
     String ELASTICSEARCH_WORKER_COUNT_DEADLETTER = 
"elasticsearch.worker_count_deadletter";
 
     String ELASTICSEARCH_WORKER_COUNT_UTILITY_DEADLETTER = 
"elasticsearch.worker_count_utility_deadletter";
 
+    String ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER = 
"elasticsearch.worker_count_delete_deadletter";
+
     String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
 
     String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
@@ -105,6 +109,13 @@ public interface IndexProcessorFig extends GuicyFig {
     int getWorkerCountUtility();
 
     /**
+     * The number of worker threads used to read delete requests from the 
queue.
+     */
+    @Default("1")
+    @Key(ELASTICSEARCH_WORKER_COUNT_DELETE)
+    int getWorkerCountDelete();
+
+    /**
      * The number of worker threads used to read dead messages from the index 
dead letter queue and reload them into the index queue.
      */
     @Default("1")
@@ -119,6 +130,13 @@ public interface IndexProcessorFig extends GuicyFig {
     int getWorkerCountUtilityDeadLetter();
 
     /**
+     * The number of worker threads used to read dead messages from the delete 
dead letter queue and reload them into the delete queue.
+     */
+    @Default("1")
+    @Key(ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER)
+    int getWorkerCountDeleteDeadLetter();
+
+    /**
      * Set the implementation to use for queuing.
      * Valid values: TEST, LOCAL, SQS, SNS
      * NOTE: SQS and SNS equate to the same implementation of Amazon queue 
services.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index 2b3573e..d6bdd93 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -42,7 +43,7 @@ public interface ReIndexAction {
      * Index a batch list of entities.  Goes to the utility queue.
      * @param edges
      * @param updatedSince
-     * @param forUtilityQueue
+     * @param queueType
      */
-    void indexBatch(final List<EdgeScope> edges, final long updatedSince, 
boolean forUtilityQueue);
+    void indexBatch(final List<EdgeScope> edges, final long updatedSince, 
AsyncEventQueueType queueType);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 0660d5e..c7371b3 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -169,7 +170,7 @@ public class ReIndexServiceImpl implements ReIndexService {
             .buffer( indexProcessorFig.getReindexBufferSize())
             .doOnNext( edgeScopes -> {
                 logger.info("Sending batch of {} to be indexed.", 
edgeScopes.size());
-                indexService.indexBatch(edgeScopes, modifiedSince, true);
+                indexService.indexBatch(edgeScopes, modifiedSince, 
AsyncEventQueueType.UTILITY);
                 count.addAndGet(edgeScopes.size() );
                 if( edgeScopes.size() > 0 ) {
                     writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 
1));

http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 83f4c8b..4886b08 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
@@ -124,7 +125,7 @@ public abstract class AbstractReadGraphFilter extends 
AbstractPathFilter<Id, Id,
                     final Observable<IndexOperationMessage> 
indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, 
markedEdge);
 
                     indexMessageObservable
-                        .compose(applyCollector())
+                        .compose(applyCollector(AsyncEventQueueType.DELETE))
                         .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
 
@@ -139,7 +140,7 @@ public abstract class AbstractReadGraphFilter extends 
AbstractPathFilter<Id, Id,
                         entityDeleteResults = 
eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
 
                     entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
+                        .compose(applyCollector(AsyncEventQueueType.DELETE))
                         .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
 
@@ -159,7 +160,7 @@ public abstract class AbstractReadGraphFilter extends 
AbstractPathFilter<Id, Id,
                         entityDeleteResults = 
eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
 
                     entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
+                        .compose(applyCollector(AsyncEventQueueType.DELETE))
                         .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
 
@@ -252,13 +253,13 @@ public abstract class AbstractReadGraphFilter extends 
AbstractPathFilter<Id, Id,
         }
     }
 
-    private Observable.Transformer<IndexOperationMessage, 
IndexOperationMessage> applyCollector() {
+    private Observable.Transformer<IndexOperationMessage, 
IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
 
         return observable -> observable
             .collect(() -> new IndexOperationMessage(), (collector, single) -> 
collector.ingest(single))
             .filter(msg -> !msg.isEmpty())
             .doOnNext(indexOperation -> {
-                asyncEventService.queueIndexOperationMessage(indexOperation, 
false);
+                asyncEventService.queueIndexOperationMessage(indexOperation, 
queueType);
             });
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
index 1afb524..e8a20dd 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
@@ -124,7 +125,7 @@ public abstract class AbstractReadReverseGraphFilter 
extends AbstractPathFilter<
                     final Observable<IndexOperationMessage> 
indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, 
markedEdge);
 
                     indexMessageObservable
-                        .compose(applyCollector())
+                        .compose(applyCollector(AsyncEventQueueType.DELETE))
                         .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
 
@@ -139,7 +140,7 @@ public abstract class AbstractReadReverseGraphFilter 
extends AbstractPathFilter<
                         entityDeleteResults = 
eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
 
                     entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
+                        .compose(applyCollector(AsyncEventQueueType.DELETE))
                         .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
 
@@ -159,7 +160,7 @@ public abstract class AbstractReadReverseGraphFilter 
extends AbstractPathFilter<
                         entityDeleteResults = 
eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
 
                     entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
+                        .compose(applyCollector(AsyncEventQueueType.DELETE))
                         .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
 
@@ -245,13 +246,13 @@ public abstract class AbstractReadReverseGraphFilter 
extends AbstractPathFilter<
         }
     }
 
-    private Observable.Transformer<IndexOperationMessage, 
IndexOperationMessage> applyCollector() {
+    private Observable.Transformer<IndexOperationMessage, 
IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
 
         return observable -> observable
             .collect(() -> new IndexOperationMessage(), (collector, single) -> 
collector.ingest(single))
             .filter(msg -> !msg.isEmpty())
             .doOnNext(indexOperation -> {
-                asyncEventService.queueIndexOperationMessage(indexOperation, 
false);
+                asyncEventService.queueIndexOperationMessage(indexOperation, 
queueType);
             });
 
     }

Reply via email to