Make sure that 2nd hop index event lands on utility queue if it was consumed from that queue originally.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a4cc63f8 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a4cc63f8 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a4cc63f8 Branch: refs/heads/asf-site Commit: a4cc63f8ca1ce16a2057d7219c3b5606ac3f464d Parents: f8c92f6 Author: Michael Russo <mru...@apigee.com> Authored: Mon Oct 3 23:37:59 2016 -0700 Committer: Michael Russo <mru...@apigee.com> Committed: Mon Oct 3 23:37:59 2016 -0700 ---------------------------------------------------------------------- .../corepersistence/asyncevents/AsyncEventServiceImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/a4cc63f8/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index bc5d139..dba4edf 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -864,7 +864,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { List<IndexEventResult> indexEventResults = callEventHandlers( messages ); // submit the processed messages to index producer - List<QueueMessage> messagesToAck = submitToIndex( indexEventResults ); + List<QueueMessage> messagesToAck = submitToIndex( indexEventResults, isUtilityQueue ); if ( messagesToAck.size() < messages.size() ) { logger.warn( "Missing {} message(s) from index processing", @@ -904,7 +904,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { * Submit results to index and return the queue messages to be ack'd * */ - private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) { + private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) { // if nothing came back then return empty list if(indexEventResults==null){ @@ -931,7 +931,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { // collect into a list of QueueMessages that can be ack'd later .collect(Collectors.toList()); - queueIndexOperationMessage(combined, false); + queueIndexOperationMessage(combined, forUtilityQueue); return queueMessages; }