Github user peterj99a commented on a diff in the pull request:
https://github.com/apache/usergrid/pull/575#discussion_r141456652
--- Diff:
stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
---
@@ -625,16 +677,57 @@ public void sendMessages( final List bodies ) throws
IOException {
return successMessages;
}
-
@Override
- public <T extends Serializable> void sendMessageToLocalRegion(final T
body ) throws IOException {
+ public <T extends Serializable> void sendMessageToLocalRegion(final T
body, Boolean async) throws IOException {
+ boolean sendAsync = async == null ? fig.isAsyncQueue() :
async.booleanValue();
+ if (sendAsync) {
+ sendMessageToLocalRegionAsync(body);
+ } else {
+ sendMessageToLocalRegionSync(body);
+ }
+ }
- if ( sqsAsync == null ) {
+ private <T extends Serializable> void
sendMessageToLocalRegionSync(final T body) throws IOException {
+
+ if ( sqs == null ) {
logger.error( "SQS client is null, perhaps it failed to
initialize successfully" );
return;
}
final String stringBody = toString( body );
+ if (logger.isDebugEnabled()) {
+ logger.debug(" sendMessageToLocalRegion " + stringBody);
+ }
+
+ String url = getReadQueue().getUrl();
+
+ if ( logger.isTraceEnabled() ) {
+ logger.trace( "Publishing Message...{} to url: {}",
stringBody, url );
+ }
+
+ SendMessageRequest messageRequest = new SendMessageRequest(url,
stringBody);
+ try {
+ SendMessageResult result = sqs.sendMessage(messageRequest);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Successfully published... messageID=[{}],
arn=[{}]", result.getMessageId(),
+ url);
+ }
+ } catch (Exception e) {
+ logger.error("Failed to send this message {}. To this address
{}. Error was ", messageRequest.getMessageBody(), url, e);
--- End diff --
ditto
---