1. Remove messages from dead letter queue if the message map entry was never 
received after a configurable timeout (15 minutes currently).
2. Log messages as they are moved from the dead letter queue back to the 
originating queue. Also log messages that are in dead letter queue and can't be 
moved to originating queue for some reason.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c748242f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c748242f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c748242f

Branch: refs/heads/asf-site
Commit: c748242fa6a9bc79ebb60b4071cc532ab798fb3c
Parents: 5935460
Author: Mike Dunker <mdun...@google.com>
Authored: Mon Jun 19 12:18:34 2017 -0700
Committer: Mike Dunker <mdun...@google.com>
Committed: Mon Jun 19 12:18:34 2017 -0700

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      | 37 +++++++++++++++++---
 .../persistence/queue/LegacyQueueFig.java       |  3 ++
 2 files changed, 35 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/c748242f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 7c33969..530cf7d 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -724,10 +724,13 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
                indexOperationMessage =
                    ObjectJsonSerializer.INSTANCE.fromString(highConsistency, 
IndexOperationMessage.class);
 
+           } else if (System.currentTimeMillis() > 
elasticsearchIndexEvent.getCreationTime() + queueFig.getMapMessageTimeout()) {
+                // if esMapPersistence message hasn't been received yet, log 
and return (will be acked)
+                logger.error("ES map message never received, removing message 
from queue. indexBatchId={}", messageId);
+                return;
            } else {
-
-               throw new 
IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
-
+                logger.warn("ES map message not received yet. indexBatchId={} 
elapsedTimeMsec={}", messageId, System.currentTimeMillis() - 
elasticsearchIndexEvent.getCreationTime());
+                throw new 
IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
            }
 
         } else {
@@ -1105,14 +1108,38 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
                                                  try {
                                                      // put the dead letter 
messages back in the appropriate queue
                                                      LegacyQueueManager 
returnQueue = null;
+                                                     String queueType;
                                                      if (isUtilityDeadQueue) {
-                                                         logger.warn("Utility 
dead queue message count: {}", messages.size());
                                                          returnQueue = 
utilityQueue;
+                                                         queueType = "utility";
                                                      } else {
-                                                         logger.warn("Index 
dead queue message count: {}", messages.size());
                                                          returnQueue = 
indexQueue;
+                                                         queueType = "index";
                                                      }
                                                      List<LegacyQueueMessage> 
successMessages = returnQueue.sendQueueMessages(messages);
+                                                     for (LegacyQueueMessage 
msg : successMessages) {
+                                                         
logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", 
queueType, msg.getType(), msg.getMessageId(), msg.getStringBody());
+                                                     }
+                                                     int 
unsuccessfulMessagesSize = messages.size() - successMessages.size();
+                                                     if 
(unsuccessfulMessagesSize > 0) {
+                                                         // some messages 
couldn't be sent to originating queue, log
+                                                         Set<String> 
successMessageIds = new HashSet<>();
+                                                         for 
(LegacyQueueMessage msg : successMessages) {
+                                                             String messageId 
= msg.getMessageId();
+                                                             if 
(successMessageIds.contains(messageId)) {
+                                                                 
logger.warn("Found duplicate messageId in returned messages: {}", messageId);
+                                                             } else {
+                                                                 
successMessageIds.add(messageId);
+                                                             }
+                                                         }
+                                                         for 
(LegacyQueueMessage msg : messages) {
+                                                             String messageId 
= msg.getMessageId();
+                                                             if 
(!successMessageIds.contains(messageId)) {
+                                                                 
logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: 
{}", queueType, msg.getType(), messageId, msg.getStringBody());
+                                                             }
+                                                         }
+                                                     }
+
                                                      if (isUtilityDeadQueue) {
                                                          
ackUtilityDeadQueue(successMessages);
                                                      } else {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c748242f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
index 6fe96dd..0ebcc7b 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
@@ -106,4 +106,7 @@ public interface LegacyQueueFig extends GuicyFig {
     @Default("false") // 30 seconds
     boolean getQuorumFallback();
 
+    @Key("usergrid.queue.map.message.timeout")
+    @Default("900000") // 15 minutes
+    int getMapMessageTimeout();
 }

Reply via email to