Repository: usergrid Updated Branches: refs/heads/USERGRID-1048 94a907812 -> 04a3f47bd
Fixes serialization tests and verifies full end to end functionality. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/04a3f47b Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/04a3f47b Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/04a3f47b Branch: refs/heads/USERGRID-1048 Commit: 04a3f47bd86ec0674ff487f479327e0f174f0425 Parents: 94a9078 Author: Todd Nine <tn...@apigee.com> Authored: Mon Oct 19 12:00:56 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Mon Oct 19 12:00:56 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 70 ++- .../asyncevents/model/AsyncEvent.java | 5 +- .../index/IndexProcessorFig.java | 7 +- .../util/ObjectJsonSerializer.java | 28 +- .../persistence/queue/QueueManager.java | 2 +- .../persistence/queue/guice/QueueModule.java | 1 - .../queue/impl/SNSQueueManagerImpl.java | 515 ++++++++++--------- .../queue/impl/SQSQueueManagerImpl.java | 362 ------------- 8 files changed, 336 insertions(+), 654 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index c9f0953..d319ac8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -29,19 +29,13 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; -import com.google.common.base.Optional; - -import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent; -import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer; -import org.apache.usergrid.exception.NotImplementedException; -import org.apache.usergrid.persistence.index.impl.IndexProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent; import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent; import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent; +import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent; import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent; import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent; import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent; @@ -50,6 +44,8 @@ import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; import org.apache.usergrid.corepersistence.index.IndexProcessorFig; import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy; import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; @@ -61,6 +57,7 @@ import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.IndexLocationStrategy; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.apache.usergrid.persistence.index.impl.IndexProducer; import org.apache.usergrid.persistence.map.MapManager; import org.apache.usergrid.persistence.map.MapManagerFactory; import org.apache.usergrid.persistence.map.MapScope; @@ -78,6 +75,7 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -94,8 +92,6 @@ public class AmazonAsyncEventService implements AsyncEventService { private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class); - private static final ObjectJsonSerializer OBJECT_JSON_SERIALIZER = new ObjectJsonSerializer( ); - // SQS maximum receive messages is 10 private static final int MAX_TAKE = 10; public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars @@ -192,6 +188,22 @@ public class AmazonAsyncEventService implements AsyncEventService { } } + + private void offerTopic( final Serializable operation ) { + final Timer.Context timer = this.writeTimer.time(); + + try { + //signal to SQS + this.queue.sendMessageToTopic( operation ); + } + catch ( IOException e ) { + throw new RuntimeException( "Unable to queue message", e ); + } + finally { + timer.stop(); + } + } + private void offerBatch(final List operations){ final Timer.Context timer = this.writeTimer.time(); @@ -226,27 +238,6 @@ public class AmazonAsyncEventService implements AsyncEventService { } - /** - * Ack message in SQS - */ - public void ack(final QueueMessage message) { - - final Timer.Context timer = this.ackTimer.time(); - - try{ - queue.commitMessage( message ); - - //decrement our in-flight counter - inFlight.decrementAndGet(); - - }catch(Exception e){ - throw new RuntimeException("Unable to ack messages", e); - }finally { - timer.stop(); - } - - - } /** * Ack message in SQS @@ -355,7 +346,8 @@ public class AmazonAsyncEventService implements AsyncEventService { public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) { IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy( applicationScope ); - offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy))); + offerTopic( + new InitializeApplicationIndexEvent( new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) ); } @@ -468,7 +460,7 @@ public class AmazonAsyncEventService implements AsyncEventService { */ public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) { - final String jsonValue = OBJECT_JSON_SERIALIZER.toByteBuffer( indexOperationMessage ); + final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage ); final UUID newMessageId = UUIDGenerator.newTimeUUID(); @@ -482,12 +474,8 @@ public class AmazonAsyncEventService implements AsyncEventService { final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId ); //send to the topic so all regions index the batch - try { - queue.sendMessageToTopic( elasticsearchIndexEvent ); - } - catch ( IOException e ) { - throw new RuntimeException( "Unable to pulish to topic", e ); - } + + offerTopic( elasticsearchIndexEvent ); } public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){ @@ -505,7 +493,7 @@ public class AmazonAsyncEventService implements AsyncEventService { String highConsistency = null; if(message == null){ - logger.error( "Receive message with id {} to process, unable to find it, reading with higher consistency level" ); + logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" ); highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() ); @@ -517,11 +505,11 @@ public class AmazonAsyncEventService implements AsyncEventService { //our original local read has it, parse it. if(message != null){ - indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( message, IndexOperationMessage.class ); + indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class ); } //we tried to read it at a higher consistency level and it works else if (highConsistency != null){ - indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( highConsistency, IndexOperationMessage.class ); + indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class ); } //we couldn't find it, bail http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java index 1af54e3..7c51003 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java @@ -30,8 +30,11 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; /** * Marker class for serialization + * + * Note that when you add a subtype, you will need to add it's serialization value below in the JsonSubTypes annotation. + * + * Each name must be unique, and must map to a subclass that is serialized */ - @JsonIgnoreProperties( ignoreUnknown = true ) @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" ) @JsonSubTypes( { http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java index 6fd73b4..ec9b315 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java @@ -105,10 +105,13 @@ public interface IndexProcessorFig extends GuicyFig { boolean resolveSynchronously(); /** - * Get the message TTL in milliseconds + * Get the message TTL in milliseconds. Defaults to 24 hours + * + * 24 * 60 * 60 * 1000 + * * @return */ - @Default("604800000") + @Default("86400000") @Key( "elasticsearch.message.ttl" ) int getIndexMessageTtl(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java index dbd5ca3..4e5873a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.Serializable; import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; @@ -33,16 +34,33 @@ import com.google.common.base.Preconditions; public final class ObjectJsonSerializer { - private final JsonFactory JSON_FACTORY = new JsonFactory(); + private static final JsonFactory JSON_FACTORY = new JsonFactory(); - private final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY ); + private static final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY ); + + static{ + + /** + * Because of the way SNS escapes all our json, we have to tell jackson to accept it. See the documentation + * here for how SNS borks the message body + * + * http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.html + */ + MAPPER.configure( JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true ); + } + + /** + * Singleton instance of our serializer, instantiating it and configuring the mapper is expensive. + */ + public static final ObjectJsonSerializer INSTANCE = new ObjectJsonSerializer(); + + + private ObjectJsonSerializer( ) { - public ObjectJsonSerializer( ) { - MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" ); } - public <T extends Serializable> String toByteBuffer( final T toSerialize ) { + public <T extends Serializable> String toString( final T toSerialize ) { Preconditions.checkNotNull( toSerialize, "toSerialize must not be null" ); final String stringValue; http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java index dc3d1b5..34a3654 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java @@ -68,7 +68,7 @@ public interface QueueManager { * @param body * @throws IOException */ - <T extends Serializable> void sendMessage(T body)throws IOException; + <T extends Serializable> void sendMessage(T body)throws IOException; /** * Send a messae to the topic to be sent to other queues http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java index dd1fe16..caf61bf 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java @@ -26,7 +26,6 @@ import org.safehaus.guicyfig.GuicyFigModule; import org.apache.usergrid.persistence.queue.QueueFig; import org.apache.usergrid.persistence.queue.QueueManager; import org.apache.usergrid.persistence.queue.QueueManagerFactory; -import org.apache.usergrid.persistence.queue.impl.SQSQueueManagerImpl; import com.google.inject.AbstractModule; import com.google.inject.assistedinject.FactoryModuleBuilder; http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index 59ecd24..a3fa05e 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -68,6 +68,7 @@ import com.amazonaws.services.sqs.model.ReceiveMessageResult; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; @@ -77,9 +78,10 @@ import com.google.common.cache.LoadingCache; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; + public class SNSQueueManagerImpl implements QueueManager { - private static final Logger logger = LoggerFactory.getLogger(SNSQueueManagerImpl.class); + private static final Logger logger = LoggerFactory.getLogger( SNSQueueManagerImpl.class ); private final QueueScope scope; private final QueueFig fig; @@ -91,55 +93,64 @@ public class SNSQueueManagerImpl implements QueueManager { private final AmazonSQSAsyncClient sqsAsync; - private final JsonFactory JSON_FACTORY = new JsonFactory(); - private final ObjectMapper mapper = new ObjectMapper(JSON_FACTORY); + private static final JsonFactory JSON_FACTORY = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper( JSON_FACTORY ); + + static { + + /** + * Because of the way SNS escapes all our json, we have to tell jackson to accept it. See the documentation + * here for how SNS borks the message body + * + * http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.html + */ + mapper.configure( JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true ); + } - private final LoadingCache<String, String> writeTopicArnMap = CacheBuilder.newBuilder() - .maximumSize(1000) - .build(new CacheLoader<String, String>() { + private final LoadingCache<String, String> writeTopicArnMap = + CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, String>() { @Override - public String load(String queueName) - throws Exception { + public String load( String queueName ) throws Exception { - return setupTopics(queueName); + return setupTopics( queueName ); } - }); + } ); - private final LoadingCache<String, Queue> readQueueUrlMap = CacheBuilder.newBuilder() - .maximumSize(1000) - .build(new CacheLoader<String, Queue>() { + private final LoadingCache<String, Queue> readQueueUrlMap = + CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, Queue>() { @Override - public Queue load(String queueName) throws Exception { + public Queue load( String queueName ) throws Exception { Queue queue = null; try { - GetQueueUrlResult result = sqs.getQueueUrl(queueName); - queue = new Queue(result.getQueueUrl()); - } catch (QueueDoesNotExistException queueDoesNotExistException) { - logger.error("Queue {} does not exist, will create", queueName); - } catch (Exception e) { - logger.error("failed to get queue from service", e); + GetQueueUrlResult result = sqs.getQueueUrl( queueName ); + queue = new Queue( result.getQueueUrl() ); + } + catch ( QueueDoesNotExistException queueDoesNotExistException ) { + logger.error( "Queue {} does not exist, will create", queueName ); + } + catch ( Exception e ) { + logger.error( "failed to get queue from service", e ); throw e; } - if (queue == null) { - String url = AmazonNotificationUtils.createQueue(sqs, queueName, fig); - queue = new Queue(url); + if ( queue == null ) { + String url = AmazonNotificationUtils.createQueue( sqs, queueName, fig ); + queue = new Queue( url ); } - setupTopics(queueName); + setupTopics( queueName ); return queue; } - }); - + } ); @Inject - public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig, - CassandraFig cassandraFig, QueueFig queueFig) { + public SNSQueueManagerImpl( @Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig, + CassandraFig cassandraFig, QueueFig queueFig ) { this.scope = scope; this.fig = fig; this.clusterFig = clusterFig; @@ -148,177 +159,179 @@ public class SNSQueueManagerImpl implements QueueManager { // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks final ExecutorService executor = TaskExecutorFactory - .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(), - TaskExecutorFactory.RejectionAction.CALLERRUNS); + .createTaskExecutor( "amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(), + TaskExecutorFactory.RejectionAction.CALLERRUNS ); final Region region = getRegion(); try { - sqs = createSQSClient(region); - sns = createSNSClient(region); - snsAsync = createAsyncSNSClient(region, executor); + sqs = createSQSClient( region ); + sns = createSNSClient( region ); + snsAsync = createAsyncSNSClient( region, executor ); sqsAsync = createAsyncSQSClient( region, executor ); - - } catch (Exception e) { - throw new RuntimeException("Error setting up mapper", e); + } + catch ( Exception e ) { + throw new RuntimeException( "Error setting up mapper", e ); } } - private String setupTopics(final String queueName) - throws Exception { - logger.info("Setting up setupTopics SNS/SQS..."); + private String setupTopics( final String queueName ) throws Exception { - String primaryTopicArn = AmazonNotificationUtils.getTopicArn(sns, queueName, true); + logger.info( "Setting up setupTopics SNS/SQS..." ); - if (logger.isDebugEnabled()) logger.debug("SNS/SQS Setup: primaryTopicArn=" + primaryTopicArn); + String primaryTopicArn = AmazonNotificationUtils.getTopicArn( sns, queueName, true ); - String queueUrl = AmazonNotificationUtils.getQueueUrlByName(sqs, queueName); - String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName(sqs, queueName); + if ( logger.isDebugEnabled() ) { + logger.debug( "SNS/SQS Setup: primaryTopicArn=" + primaryTopicArn ); + } + + String queueUrl = AmazonNotificationUtils.getQueueUrlByName( sqs, queueName ); + String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName( sqs, queueName ); - if (logger.isDebugEnabled()) logger.debug("SNS/SQS Setup: primaryQueueArn=" + primaryQueueArn); + if ( logger.isDebugEnabled() ) { + logger.debug( "SNS/SQS Setup: primaryQueueArn=" + primaryQueueArn ); + } - if (primaryQueueArn == null) { - if (logger.isDebugEnabled()) - logger.debug("SNS/SQS Setup: primaryQueueArn is null, creating queue..."); + if ( primaryQueueArn == null ) { + if ( logger.isDebugEnabled() ) { + logger.debug( "SNS/SQS Setup: primaryQueueArn is null, creating queue..." ); + } - queueUrl = AmazonNotificationUtils.createQueue(sqs, queueName, fig); - primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl(sqs, queueUrl); + queueUrl = AmazonNotificationUtils.createQueue( sqs, queueName, fig ); + primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl( sqs, queueUrl ); - if (logger.isDebugEnabled()) - logger.debug("SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn); + if ( logger.isDebugEnabled() ) { + logger.debug( "SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn ); + } } try { - SubscribeRequest primarySubscribeRequest = new SubscribeRequest(primaryTopicArn, "sqs", primaryQueueArn); - sns.subscribe(primarySubscribeRequest); + SubscribeRequest primarySubscribeRequest = new SubscribeRequest( primaryTopicArn, "sqs", primaryQueueArn ); + sns.subscribe( primarySubscribeRequest ); // ensure the SNS primary topic has permission to send to the primary SQS queue List<String> primaryTopicArnList = new ArrayList<>(); - primaryTopicArnList.add(primaryTopicArn); - AmazonNotificationUtils.setQueuePermissionsToReceive(sqs, queueUrl, primaryTopicArnList); - } catch (AmazonServiceException e) { - logger.error(String.format("Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn), e); + primaryTopicArnList.add( primaryTopicArn ); + AmazonNotificationUtils.setQueuePermissionsToReceive( sqs, queueUrl, primaryTopicArnList ); + } + catch ( AmazonServiceException e ) { + logger.error( + String.format( "Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn ), e ); } - if (fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALL) { + if ( fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALL ) { String multiRegion = fig.getRegionList(); - if (logger.isDebugEnabled()) - logger.debug("MultiRegion Setup specified, regions: [{}]", multiRegion); + if ( logger.isDebugEnabled() ) { + logger.debug( "MultiRegion Setup specified, regions: [{}]", multiRegion ); + } - String[] regionNames = multiRegion.split(","); + String[] regionNames = multiRegion.split( "," ); - final Map<String, String> arrQueueArns = new HashMap<>(regionNames.length + 1); - final Map<String, String> topicArns = new HashMap<>(regionNames.length + 1); + final Map<String, String> arrQueueArns = new HashMap<>( regionNames.length + 1 ); + final Map<String, String> topicArns = new HashMap<>( regionNames.length + 1 ); - arrQueueArns.put(primaryQueueArn, fig.getRegion()); - topicArns.put(primaryTopicArn, fig.getRegion()); + arrQueueArns.put( primaryQueueArn, fig.getRegion() ); + topicArns.put( primaryTopicArn, fig.getRegion() ); - for (String regionName : regionNames) { + for ( String regionName : regionNames ) { regionName = regionName.trim(); - Regions regions = Regions.fromName(regionName); - Region region = Region.getRegion(regions); + Regions regions = Regions.fromName( regionName ); + Region region = Region.getRegion( regions ); - AmazonSQSClient sqsClient = createSQSClient(region); - AmazonSNSClient snsClient = createSNSClient(region); // do this stuff synchronously + AmazonSQSClient sqsClient = createSQSClient( region ); + AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously // getTopicArn will create the SNS topic if it doesn't exist - String topicArn = AmazonNotificationUtils.getTopicArn(snsClient, queueName, true); - topicArns.put(topicArn, regionName); + String topicArn = AmazonNotificationUtils.getTopicArn( snsClient, queueName, true ); + topicArns.put( topicArn, regionName ); // create the SQS queue if it doesn't exist - String queueArn = AmazonNotificationUtils.getQueueArnByName(sqsClient, queueName); - if (queueArn == null) { - queueUrl = AmazonNotificationUtils.createQueue(sqsClient, queueName, fig); - queueArn = AmazonNotificationUtils.getQueueArnByUrl(sqsClient, queueUrl); + String queueArn = AmazonNotificationUtils.getQueueArnByName( sqsClient, queueName ); + if ( queueArn == null ) { + queueUrl = AmazonNotificationUtils.createQueue( sqsClient, queueName, fig ); + queueArn = AmazonNotificationUtils.getQueueArnByUrl( sqsClient, queueUrl ); } - arrQueueArns.put(queueArn, regionName); + arrQueueArns.put( queueArn, regionName ); } - logger.debug("Creating Subscriptions..."); + logger.debug( "Creating Subscriptions..." ); - for (Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet()) { + for ( Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet() ) { String queueARN = queueArnEntry.getKey(); String strSqsRegion = queueArnEntry.getValue(); - Regions sqsRegions = Regions.fromName(strSqsRegion); - Region sqsRegion = Region.getRegion(sqsRegions); + Regions sqsRegions = Regions.fromName( strSqsRegion ); + Region sqsRegion = Region.getRegion( sqsRegions ); - AmazonSQSClient subscribeSqsClient = createSQSClient(sqsRegion); + AmazonSQSClient subscribeSqsClient = createSQSClient( sqsRegion ); // ensure the URL used to subscribe is for the correct name/region - String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName(subscribeSqsClient, queueName); + String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName( subscribeSqsClient, queueName ); // this list used later for adding permissions to queues List<String> topicArnList = new ArrayList<>(); - for (Map.Entry<String, String> topicArnEntry : topicArns.entrySet()) { + for ( Map.Entry<String, String> topicArnEntry : topicArns.entrySet() ) { String topicARN = topicArnEntry.getKey(); - topicArnList.add(topicARN); + topicArnList.add( topicARN ); String strSnsRegion = topicArnEntry.getValue(); - Regions snsRegions = Regions.fromName(strSnsRegion); - Region snsRegion = Region.getRegion(snsRegions); + Regions snsRegions = Regions.fromName( strSnsRegion ); + Region snsRegion = Region.getRegion( snsRegions ); - AmazonSNSClient subscribeSnsClient = createSNSClient(snsRegion); // do this stuff synchronously - SubscribeRequest subscribeRequest = new SubscribeRequest(topicARN, "sqs", queueARN); + AmazonSNSClient subscribeSnsClient = createSNSClient( snsRegion ); // do this stuff synchronously + SubscribeRequest subscribeRequest = new SubscribeRequest( topicARN, "sqs", queueARN ); try { - logger.info("Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]", - queueARN, - strSqsRegion, - topicARN, - strSnsRegion - ); + logger.info( "Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]", queueARN, + strSqsRegion, topicARN, strSnsRegion ); - SubscribeResult subscribeResult = subscribeSnsClient.subscribe(subscribeRequest); + SubscribeResult subscribeResult = subscribeSnsClient.subscribe( subscribeRequest ); String subscriptionARN = subscribeResult.getSubscriptionArn(); - if(logger.isDebugEnabled()){ - logger.debug("Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]", queueARN, topicARN, subscriptionARN); + if ( logger.isDebugEnabled() ) { + logger.debug( + "Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]", + queueARN, topicARN, subscriptionARN ); } - - - } catch (Exception e) { - logger.error(String.format("ERROR Subscribing Queue ARN/Region=[%s / %s] and Topic ARN/Region=[%s / %s]", - queueARN, - strSqsRegion, - topicARN, - strSnsRegion), e); - - + } + catch ( Exception e ) { + logger.error( String + .format( "ERROR Subscribing Queue ARN/Region=[%s / %s] and Topic ARN/Region=[%s / %s]", + queueARN, strSqsRegion, topicARN, strSnsRegion ), e ); } } - logger.info("Adding permission to receive messages..."); + logger.info( "Adding permission to receive messages..." ); // add permission to each queue, providing a list of topics that it's subscribed to - AmazonNotificationUtils.setQueuePermissionsToReceive(subscribeSqsClient, subscribeQueueUrl, topicArnList); - + AmazonNotificationUtils + .setQueuePermissionsToReceive( subscribeSqsClient, subscribeQueueUrl, topicArnList ); } } return primaryTopicArn; } + /** * The Asynchronous SNS client is used for publishing events to SNS. - * */ - private AmazonSNSAsyncClient createAsyncSNSClient(final Region region, final ExecutorService executor) { + private AmazonSNSAsyncClient createAsyncSNSClient( final Region region, final ExecutorService executor ) { final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); - final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), executor); + final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient( ugProvider.getCredentials(), executor ); - sns.setRegion(region); + sns.setRegion( region ); return sns; } @@ -326,11 +339,8 @@ public class SNSQueueManagerImpl implements QueueManager { /** * Create the async sqs client - * @param region - * @param executor - * @return */ - private AmazonSQSAsyncClient createAsyncSQSClient(final Region region, final ExecutorService executor){ + private AmazonSQSAsyncClient createAsyncSQSClient( final Region region, final ExecutorService executor ) { final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); final AmazonSQSAsyncClient sqs = new AmazonSQSAsyncClient( ugProvider.getCredentials(), executor ); @@ -338,173 +348,209 @@ public class SNSQueueManagerImpl implements QueueManager { sqs.setRegion( region ); return sqs; - } + /** * The Synchronous SNS client is used for creating topics and subscribing queues. - * */ - private AmazonSNSClient createSNSClient(final Region region) { + private AmazonSNSClient createSNSClient( final Region region ) { final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); - final AmazonSNSClient sns = new AmazonSNSClient(ugProvider.getCredentials()); + final AmazonSNSClient sns = new AmazonSNSClient( ugProvider.getCredentials() ); - sns.setRegion(region); + sns.setRegion( region ); return sns; } private String getName() { - String name = clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace() + "_" + scope.getName() + "_" + scope.getRegionImplementation(); + String name = + clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace() + "_" + scope.getName() + "_" + + scope.getRegionImplementation(); name = name.toLowerCase(); //user lower case values - Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters"); + Preconditions.checkArgument( name.length() <= 80, "Your name must be < than 80 characters" ); return name; } + public Queue getReadQueue() { String queueName = getName(); try { - return readQueueUrlMap.get(queueName); - } catch (ExecutionException ee) { - throw new RuntimeException(ee); + return readQueueUrlMap.get( queueName ); + } + catch ( ExecutionException ee ) { + throw new RuntimeException( ee ); } } + public String getWriteTopicArn() { try { - return writeTopicArnMap.get(getName()); - - } catch (ExecutionException ee) { - throw new RuntimeException(ee); + return writeTopicArnMap.get( getName() ); + } + catch ( ExecutionException ee ) { + throw new RuntimeException( ee ); } } + @Override - public rx.Observable<QueueMessage> getMessages(final int limit, - final int transactionTimeout, - final int waitTime, - final Class klass) { + public rx.Observable<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime, + final Class klass ) { - if (sqs == null) { - logger.error("SQS is null - was not initialized properly"); + if ( sqs == null ) { + logger.error( "SQS is null - was not initialized properly" ); return rx.Observable.empty(); } String url = getReadQueue().getUrl(); - if (logger.isDebugEnabled()) logger.debug("Getting up to {} messages from {}", limit, url); + if ( logger.isDebugEnabled() ) { + logger.debug( "Getting up to {} messages from {}", limit, url ); + } - ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url); - receiveMessageRequest.setMaxNumberOfMessages(limit); - receiveMessageRequest.setVisibilityTimeout(Math.max(1, transactionTimeout / 1000)); - receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000); + ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( url ); + receiveMessageRequest.setMaxNumberOfMessages( limit ); + receiveMessageRequest.setVisibilityTimeout( Math.max( 1, transactionTimeout / 1000 ) ); + receiveMessageRequest.setWaitTimeSeconds( waitTime / 1000 ); try { - ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest); + ReceiveMessageResult result = sqs.receiveMessage( receiveMessageRequest ); List<Message> messages = result.getMessages(); - if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url); + if ( logger.isDebugEnabled() ) { + logger.debug( "Received {} messages from {}", messages.size(), url ); + } + + List<QueueMessage> queueMessages = new ArrayList<>( messages.size() ); - List<QueueMessage> queueMessages = new ArrayList<>(messages.size()); + for ( Message message : messages ) { - for (Message message : messages) { - Object body; + Object payload; final String originalBody = message.getBody(); try { - final JsonNode bodyNode = mapper.readTree(message.getBody()); - JsonNode bodyObj = bodyNode.has("Message") ? bodyNode.get("Message") : bodyNode; + final JsonNode bodyNode = mapper.readTree( message.getBody() ); + /** + * When a message originates from SNS it has a "Message" we have to extract + * it and then process it seperately + */ - final String bodyText = mapper.writeValueAsString( bodyObj );; + if ( bodyNode.has( "Message" ) ) { + final String snsNode = bodyNode.get( "Message" ).asText(); - body = fromString(bodyText, klass); - } catch (Exception e) { - logger.error(String.format("failed to deserialize message: %s", message.getBody()), e); - throw new RuntimeException(e); + payload = deSerializeSQSMessage( snsNode, klass ); + } + else { + payload = deSerializeSQSMessage( originalBody, klass ); + } + } + catch ( Exception e ) { + logger.error( String.format( "failed to deserialize message: %s", message.getBody() ), e ); + throw new RuntimeException( e ); } - QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type")); - queueMessage.setStringBody(originalBody); - queueMessages.add(queueMessage); + QueueMessage queueMessage = new QueueMessage( message.getMessageId(), message.getReceiptHandle(), payload, + message.getAttributes().get( "type" ) ); + queueMessage.setStringBody( originalBody ); + queueMessages.add( queueMessage ); } - return rx.Observable.from(queueMessages); - - } catch (com.amazonaws.services.sqs.model.QueueDoesNotExistException dne) { - logger.error(String.format("Queue does not exist! [%s]", url), dne); - } catch (Exception e) { - logger.error(String.format("Programming error getting messages from queue=[%s] exist!", url), e); + return rx.Observable.from( queueMessages ); + } + catch ( com.amazonaws.services.sqs.model.QueueDoesNotExistException dne ) { + logger.error( String.format( "Queue does not exist! [%s]", url ), dne ); + } + catch ( Exception e ) { + logger.error( String.format( "Programming error getting messages from queue=[%s] exist!", url ), e ); } - return rx.Observable.from(new ArrayList<>(0)); + return rx.Observable.from( new ArrayList<>( 0 ) ); } + + /** + * Take a string, possibly escaped via SNS, and run it through our mapper to create an object) + */ + private Object deSerializeSQSMessage( final String message, final Class type ) { + try { + final Object o = mapper.readValue( message, type ); + return o; + } + catch ( Exception e ) { + throw new RuntimeException( "Unable to deserialize message " + message + " for class " + type, e ); + } + } + + @Override public long getQueueDepth() { String key = "ApproximateNumberOfMessages"; try { - GetQueueAttributesResult result = sqs.getQueueAttributes(getReadQueue().getUrl(), Collections.singletonList(key)); - String depthString = result.getAttributes().get(key); - return depthString != null ? Long.parseLong(depthString) : 0; - }catch (Exception e){ - logger.error("Exception getting queue depth",e); + GetQueueAttributesResult result = + sqs.getQueueAttributes( getReadQueue().getUrl(), Collections.singletonList( key ) ); + String depthString = result.getAttributes().get( key ); + return depthString != null ? Long.parseLong( depthString ) : 0; + } + catch ( Exception e ) { + logger.error( "Exception getting queue depth", e ); return -1; - } } @Override public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException { - if (snsAsync == null) { - logger.error("SNS client is null, perhaps it failed to initialize successfully"); - return; - } - - final String stringBody = toString(body); + if ( snsAsync == null ) { + logger.error( "SNS client is null, perhaps it failed to initialize successfully" ); + return; + } - String topicArn = getWriteTopicArn(); + final String stringBody = toString( body ); - if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn); + String topicArn = getWriteTopicArn(); - PublishRequest publishRequest = new PublishRequest(topicArn, stringBody); + if ( logger.isDebugEnabled() ) { + logger.debug( "Publishing Message...{} to arn: {}", stringBody, topicArn ); + } - snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() { - @Override - public void onError( Exception e ) { - logger.error( "Error publishing message... {}", e ); - } + PublishRequest publishRequest = new PublishRequest( topicArn, stringBody ); + snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() { + @Override + public void onError( Exception e ) { + logger.error( "Error publishing message... {}", e ); + } - @Override - public void onSuccess( PublishRequest request, PublishResult result ) { - if ( logger.isDebugEnabled() ) logger - .debug( "Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(), - request.getTopicArn() ); - } - } ); + @Override + public void onSuccess( PublishRequest request, PublishResult result ) { + if ( logger.isDebugEnabled() ) { + logger.debug( "Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(), + request.getTopicArn() ); + } + } + } ); } @Override - public void sendMessages(final List bodies) throws IOException { + public void sendMessages( final List bodies ) throws IOException { - if (snsAsync == null) { - logger.error("SNS client is null, perhaps it failed to initialize successfully"); + if ( snsAsync == null ) { + logger.error( "SNS client is null, perhaps it failed to initialize successfully" ); return; } - for (Object body : bodies) { - sendMessage((Serializable)body); + for ( Object body : bodies ) { + sendMessage( ( Serializable ) body ); } - } @@ -545,94 +591,81 @@ public class SNSQueueManagerImpl implements QueueManager { } ); } + @Override public void deleteQueue() { - logger.warn("Deleting queue: "+getReadQueue().getUrl()); - sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl())); - logger.warn("Deleting queue: "+getReadQueue().getUrl()+"_dead"); - sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()+"_dead")); - + logger.warn( "Deleting queue: " + getReadQueue().getUrl() ); + sqs.deleteQueue( new DeleteQueueRequest().withQueueUrl( getReadQueue().getUrl() ) ); + logger.warn( "Deleting queue: " + getReadQueue().getUrl() + "_dead" ); + sqs.deleteQueue( new DeleteQueueRequest().withQueueUrl( getReadQueue().getUrl() + "_dead" ) ); } @Override - public void commitMessage(final QueueMessage queueMessage) { + public void commitMessage( final QueueMessage queueMessage ) { String url = getReadQueue().getUrl(); - if (logger.isDebugEnabled()) - logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url); + if ( logger.isDebugEnabled() ) { + logger.debug( "Commit message {} to queue {}", queueMessage.getMessageId(), url ); + } - sqs.deleteMessage(new DeleteMessageRequest() - .withQueueUrl(url) - .withReceiptHandle(queueMessage.getHandle())); + sqs.deleteMessage( + new DeleteMessageRequest().withQueueUrl( url ).withReceiptHandle( queueMessage.getHandle() ) ); } @Override - public void commitMessages(final List<QueueMessage> queueMessages) { + public void commitMessages( final List<QueueMessage> queueMessages ) { String url = getReadQueue().getUrl(); - if (logger.isDebugEnabled()) logger.debug("Commit messages {} to queue {}", queueMessages.size(), url); + if ( logger.isDebugEnabled() ) { + logger.debug( "Commit messages {} to queue {}", queueMessages.size(), url ); + } List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(); - for (QueueMessage message : queueMessages) { - entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getHandle())); + for ( QueueMessage message : queueMessages ) { + entries.add( new DeleteMessageBatchRequestEntry( message.getMessageId(), message.getHandle() ) ); } - DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url, entries); - DeleteMessageBatchResult result = sqs.deleteMessageBatch(request); + DeleteMessageBatchRequest request = new DeleteMessageBatchRequest( url, entries ); + DeleteMessageBatchResult result = sqs.deleteMessageBatch( request ); boolean successful = result.getFailed().size() <= 0; - if (!successful) { - for (BatchResultErrorEntry failed : result.getFailed()) { - logger.error("Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId()); + if ( !successful ) { + for ( BatchResultErrorEntry failed : result.getFailed() ) { + logger.error( "Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId() ); } } } /** - * Read the object from Base64 string. - */ - - private Object fromString(final String s, final Class klass) - throws IOException, ClassNotFoundException { - - Object o = mapper.readValue(s, klass); - return o; - } - - /** * Write the object to a Base64 string. */ - private String toString(final Object o) throws IOException { - return mapper.writeValueAsString(o); + private String toString( final Object o ) throws IOException { + return mapper.writeValueAsString( o ); } /** * Get the region - * - * @return */ private Region getRegion() { - Regions regions = Regions.fromName(fig.getRegion()); - return Region.getRegion(regions); + Regions regions = Regions.fromName( fig.getRegion() ); + return Region.getRegion( regions ); } /** * Create the SQS client for the specified settings */ - private AmazonSQSClient createSQSClient(final Region region) { + private AmazonSQSClient createSQSClient( final Region region ) { final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); - final AmazonSQSClient sqs = new AmazonSQSClient(ugProvider.getCredentials()); + final AmazonSQSClient sqs = new AmazonSQSClient( ugProvider.getCredentials() ); - sqs.setRegion(region); + sqs.setRegion( region ); return sqs; } - - } http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java deleted file mode 100644 index 0c56c05..0000000 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.queue.impl; - - -import java.io.IOException; -import java.io.Serializable; -import java.util.*; -import java.util.concurrent.ExecutionException; - -import com.amazonaws.services.sqs.model.*; -import org.apache.usergrid.persistence.core.guicyfig.ClusterFig; -import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.queue.Queue; -import org.apache.usergrid.persistence.queue.QueueFig; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.QueueMessage; -import org.apache.usergrid.persistence.queue.QueueScope; - -import com.amazonaws.regions.Region; -import com.amazonaws.regions.Regions; -import com.amazonaws.services.sqs.AmazonSQSClient; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; -import com.google.common.base.Preconditions; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -public class SQSQueueManagerImpl implements QueueManager { - private static final Logger logger = LoggerFactory.getLogger(SQSQueueManagerImpl.class); - - - private final QueueScope scope; - private ObjectMapper mapper; - protected final QueueFig fig; - private final ClusterFig clusterFig; - protected final AmazonSQSClient sqs; - - private static SmileFactory smileFactory = new SmileFactory(); - - private LoadingCache<String, Queue> urlMap = CacheBuilder.newBuilder() - .maximumSize(1000) - .build(new CacheLoader<String, Queue>() { - @Override - public Queue load(String queueName) throws Exception { - - //the amazon client is not thread safe, we need to create one per queue - Queue queue = null; - - try { - - GetQueueUrlResult result = sqs.getQueueUrl(queueName); - queue = new Queue(result.getQueueUrl()); - - } catch (QueueDoesNotExistException queueDoesNotExistException) { - //no op, swallow - logger.error("Queue {} does not exist, creating", queueName); - - } catch (Exception e) { - logger.error("failed to get queue from service", e); - throw e; - } - - if (queue == null) { - - final String deadletterQueueName = String.format("%s_dead", queueName); - final Map<String, String> deadLetterAttributes = new HashMap<>(2); - - deadLetterAttributes.put("MessageRetentionPeriod", fig.getDeadletterRetentionPeriod()); - CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest() - .withQueueName(deadletterQueueName).withAttributes(deadLetterAttributes); - - final CreateQueueResult deadletterResult = sqs.createQueue(createDeadLetterQueueRequest); - logger.info("Created deadletter queue with url {}", deadletterResult.getQueueUrl()); - - final String deadletterArn = AmazonNotificationUtils.getQueueArnByName(sqs, deadletterQueueName); - - String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\"," + - " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(), deadletterArn); - - final Map<String, String> queueAttributes = new HashMap<>(2); - deadLetterAttributes.put("MessageRetentionPeriod", fig.getRetentionPeriod()); - deadLetterAttributes.put("RedrivePolicy", redrivePolicy); - - CreateQueueRequest createQueueRequest = new CreateQueueRequest(). - withQueueName(queueName) - .withAttributes(queueAttributes); - - CreateQueueResult result = sqs.createQueue(createQueueRequest); - - String url = result.getQueueUrl(); - queue = new Queue(url); - - logger.info("Created queue with url {}", url); - } - - return queue; - } - }); - - - @Inject - public SQSQueueManagerImpl(@Assisted QueueScope scope, final QueueFig fig, final ClusterFig clusterFig) { - - this.scope = scope; - this.fig = fig; - this.clusterFig = clusterFig; - try { - - smileFactory.delegateToTextual(true); - mapper = new ObjectMapper(smileFactory); - //pretty print, disabling for speed -// mapper.enable(SerializationFeature.INDENT_OUTPUT); - mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class"); - sqs = createClient(); - - } catch (Exception e) { - throw new RuntimeException("Error setting up mapper", e); - } - } - - - protected String getName() { - - String name = clusterFig.getClusterName() + "_" + scope.getName(); - - Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters"); - - return name; - } - - public Queue getQueue() { - - try { - Queue queue = urlMap.get(getName()); - return queue; - } catch (ExecutionException ee) { - throw new RuntimeException(ee); - } - } - - @Override - public rx.Observable<QueueMessage> getMessages(final int limit, - final int transactionTimeout, - final int waitTime, - final Class klass) { - - if (sqs == null) { - logger.error("Sqs is null"); - return rx.Observable.empty(); - } - - String url = getQueue().getUrl(); - - if (logger.isDebugEnabled()) logger.debug("Getting Max {} messages from {}", limit, url); - - ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url); - receiveMessageRequest.setMaxNumberOfMessages(limit); - receiveMessageRequest.setVisibilityTimeout(transactionTimeout / 1000); - receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000); - ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest); - List<Message> messages = result.getMessages(); - - if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url); - - List<QueueMessage> queueMessages = new ArrayList<>(messages.size()); - - for (Message message : messages) { - Object body; - - try { - body = fromString(message.getBody(), klass); - } catch (Exception e) { - logger.error("failed to deserialize message", e); - throw new RuntimeException(e); - } - - QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type")); - queueMessage.setStringBody(message.getBody()); - queueMessages.add(queueMessage); - } - - return rx.Observable.from(queueMessages); - } - - @Override - public long getQueueDepth() { - String key = "ApproximateNumberOfMessages"; - try { - GetQueueAttributesResult result = sqs.getQueueAttributes(new GetQueueAttributesRequest().withAttributeNames(key)); - String depthString = result.getAttributes().get(key); - return depthString != null ? Long.parseLong(depthString) : 0; - }catch (Exception e){ - logger.error("Exception getting queue depth",e); - return -1; - - } - } - @Override - public void sendMessages(final List bodies) throws IOException { - - if (sqs == null) { - logger.error("Sqs is null"); - return; - } - String url = getQueue().getUrl(); - - if (logger.isDebugEnabled()) logger.debug("Sending Messages...{} to {}", bodies.size(), url); - - SendMessageBatchRequest request = new SendMessageBatchRequest(url); - List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size()); - - for (Object body : bodies) { - SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry(); - entry.setId(UUID.randomUUID().toString()); - entry.setMessageBody(toString(body)); - entry.addMessageAttributesEntry("type", new MessageAttributeValue().withStringValue("mytype")); - entries.add(entry); - } - - request.setEntries(entries); - sqs.sendMessageBatch(request); - - } - - - @Override - public <T extends Serializable> void sendMessage( final T body ) throws IOException { - - if (sqs == null) { - logger.error("Sqs is null"); - return; - } - - String url = getQueue().getUrl(); - - if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url); - - final String stringBody = toString(body); - - SendMessageRequest request = new SendMessageRequest(url, stringBody); - sqs.sendMessage(request); - } - - - - @Override - public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException { - sendMessage( body ); - } - - - - @Override - public void commitMessage(final QueueMessage queueMessage) { - - String url = getQueue().getUrl(); - if (logger.isDebugEnabled()) logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url); - - sqs.deleteMessage(new DeleteMessageRequest() - .withQueueUrl(url) - .withReceiptHandle(queueMessage.getHandle())); - } - - - @Override - public void commitMessages(final List<QueueMessage> queueMessages) { - - String url = getQueue().getUrl(); - if (logger.isDebugEnabled()) logger.debug("Commit messages {} to queue {}", queueMessages.size(), url); - - List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(); - - for (QueueMessage message : queueMessages) { - entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getHandle())); - } - - DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url, entries); - DeleteMessageBatchResult result = sqs.deleteMessageBatch(request); - - boolean successful = result.getFailed().size() <= 0; - - if (!successful) { - - for (BatchResultErrorEntry failed : result.getFailed()) { - logger.error("Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId()); - } - } - } - - - /** - * Read the object from Base64 string. - */ - private Object fromString(final String s, - final Class klass) throws IOException, ClassNotFoundException { - Object o = mapper.readValue(s, klass); - return o; - } - - /** - * Write the object to a Base64 string. - */ - protected String toString(final Object o) throws IOException { - return mapper.writeValueAsString(o); - } - - - /** - * Get the region - * - * @return - */ - protected Region getRegion() { - Regions regions = Regions.fromName(fig.getRegion()); - Region region = Region.getRegion(regions); - return region; - } - - @Override - public void deleteQueue() { - logger.warn("Deleting queue: "+getQueue().getUrl()); - sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getQueue().getUrl())); - } - - - - /** - * Create the SQS client for the specified settings - */ - private AmazonSQSClient createClient() { - final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); - final AmazonSQSClient sqs = new AmazonSQSClient(ugProvider.getCredentials()); - final Region region = getRegion(); - sqs.setRegion(region); - - return sqs; - } - - -}