Repository: usergrid Updated Branches: refs/heads/USERGRID-1048 3ec0f5886 -> d8e657219
Add sourceRegion to ElasticsearchIndexEvent, fix logging statements, update sqs/sns client null checks. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3a7e60b3 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3a7e60b3 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3a7e60b3 Branch: refs/heads/USERGRID-1048 Commit: 3a7e60b3131e207890354ca5fa84258795296372 Parents: 19d30ea Author: Michael Russo <michaelaru...@gmail.com> Authored: Mon Oct 19 17:07:18 2015 -0700 Committer: Michael Russo <michaelaru...@gmail.com> Committed: Mon Oct 19 17:07:18 2015 -0700 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 9 ++++++--- .../asyncevents/model/ElasticsearchIndexEvent.java | 3 ++- .../persistence/queue/impl/SNSQueueManagerImpl.java | 9 ++------- 3 files changed, 10 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/3a7e60b3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index 67d0dab..2b583b5 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -498,7 +498,8 @@ public class AmazonAsyncEventService implements AsyncEventService { //now queue up the index message - final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId ); + final ElasticsearchIndexEvent elasticsearchIndexEvent = + new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId ); //send to the topic so all regions index the batch @@ -520,12 +521,14 @@ public class AmazonAsyncEventService implements AsyncEventService { final IndexOperationMessage indexOperationMessage; if(message == null){ - logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" ); + logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level", + messageId); final String highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() ); if(highConsistency == null){ - logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" ); + logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level", + messageId); throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/3a7e60b3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java index 207b15e..049c3a5 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java @@ -35,7 +35,8 @@ public final class ElasticsearchIndexEvent extends AsyncEvent { public ElasticsearchIndexEvent() { } - public ElasticsearchIndexEvent( UUID indexBatchId ) { + public ElasticsearchIndexEvent(String sourceRegion, UUID indexBatchId) { + super(sourceRegion); this.indexBatchId = indexBatchId; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/3a7e60b3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index 58b2a4d..3c18992 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -543,11 +543,6 @@ public class SNSQueueManagerImpl implements QueueManager { @Override public void sendMessages( final List bodies ) throws IOException { - if ( snsAsync == null ) { - logger.error( "SNS client is null, perhaps it failed to initialize successfully" ); - return; - } - for ( Object body : bodies ) { sendMessage( ( Serializable ) body ); } @@ -557,8 +552,8 @@ public class SNSQueueManagerImpl implements QueueManager { @Override public <T extends Serializable> void sendMessage( final T body ) throws IOException { - if ( snsAsync == null ) { - logger.error( "SNS client is null, perhaps it failed to initialize successfully" ); + if ( sqsAsync == null ) { + logger.error( "SQS client is null, perhaps it failed to initialize successfully" ); return; }