1. Remove messages from dead letter queue if the message map entry was never received after a configurable timeout (15 minutes currently). 2. Log messages as they are moved from the dead letter queue back to the originating queue. Also log messages that are in dead letter queue and can't be moved to originating queue for some reason.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c748242f Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c748242f Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c748242f Branch: refs/heads/asf-site Commit: c748242fa6a9bc79ebb60b4071cc532ab798fb3c Parents: 5935460 Author: Mike Dunker <mdun...@google.com> Authored: Mon Jun 19 12:18:34 2017 -0700 Committer: Mike Dunker <mdun...@google.com> Committed: Mon Jun 19 12:18:34 2017 -0700 ---------------------------------------------------------------------- .../asyncevents/AsyncEventServiceImpl.java | 37 +++++++++++++++++--- .../persistence/queue/LegacyQueueFig.java | 3 ++ 2 files changed, 35 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/c748242f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 7c33969..530cf7d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -724,10 +724,13 @@ public class AsyncEventServiceImpl implements AsyncEventService { indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class); + } else if (System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getMapMessageTimeout()) { + // if esMapPersistence message hasn't been received yet, log and return (will be acked) + logger.error("ES map message never received, removing message from queue. indexBatchId={}", messageId); + return; } else { - - throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId()); - + logger.warn("ES map message not received yet. indexBatchId={} elapsedTimeMsec={}", messageId, System.currentTimeMillis() - elasticsearchIndexEvent.getCreationTime()); + throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId()); } } else { @@ -1105,14 +1108,38 @@ public class AsyncEventServiceImpl implements AsyncEventService { try { // put the dead letter messages back in the appropriate queue LegacyQueueManager returnQueue = null; + String queueType; if (isUtilityDeadQueue) { - logger.warn("Utility dead queue message count: {}", messages.size()); returnQueue = utilityQueue; + queueType = "utility"; } else { - logger.warn("Index dead queue message count: {}", messages.size()); returnQueue = indexQueue; + queueType = "index"; } List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages); + for (LegacyQueueMessage msg : successMessages) { + logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType, msg.getType(), msg.getMessageId(), msg.getStringBody()); + } + int unsuccessfulMessagesSize = messages.size() - successMessages.size(); + if (unsuccessfulMessagesSize > 0) { + // some messages couldn't be sent to originating queue, log + Set<String> successMessageIds = new HashSet<>(); + for (LegacyQueueMessage msg : successMessages) { + String messageId = msg.getMessageId(); + if (successMessageIds.contains(messageId)) { + logger.warn("Found duplicate messageId in returned messages: {}", messageId); + } else { + successMessageIds.add(messageId); + } + } + for (LegacyQueueMessage msg : messages) { + String messageId = msg.getMessageId(); + if (!successMessageIds.contains(messageId)) { + logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType, msg.getType(), messageId, msg.getStringBody()); + } + } + } + if (isUtilityDeadQueue) { ackUtilityDeadQueue(successMessages); } else { http://git-wip-us.apache.org/repos/asf/usergrid/blob/c748242f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java index 6fe96dd..0ebcc7b 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java @@ -106,4 +106,7 @@ public interface LegacyQueueFig extends GuicyFig { @Default("false") // 30 seconds boolean getQuorumFallback(); + @Key("usergrid.queue.map.message.timeout") + @Default("900000") // 15 minutes + int getMapMessageTimeout(); }