Repository: usergrid Updated Branches: refs/heads/hotfix_20171205 71169f89a -> acbecde82
don't update SQS queue permissions if the SQS queue already exists Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/acbecde8 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/acbecde8 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/acbecde8 Branch: refs/heads/hotfix_20171205 Commit: acbecde8282cd987c07054770e9a990f771511ff Parents: 71169f8 Author: Mike Dunker <mdun...@google.com> Authored: Thu Dec 7 18:37:38 2017 -0800 Committer: Mike Dunker <mdun...@google.com> Committed: Thu Dec 7 18:37:38 2017 -0800 ---------------------------------------------------------------------- .../queue/impl/SNSQueueManagerImpl.java | 29 +++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/acbecde8/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index b5d52dc..60554c4 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -195,6 +195,7 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { logger.trace( "SNS/SQS Setup: primaryQueueArn={}", primaryQueueArn ); } + boolean localQueueCreated = false; if ( primaryQueueArn == null ) { if ( logger.isTraceEnabled() ) { logger.trace( "SNS/SQS Setup: primaryQueueArn is null, creating queue..." ); @@ -202,6 +203,7 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { queueUrl = AmazonNotificationUtils.createQueue( sqs, queueName, fig ); primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl( sqs, queueUrl ); + localQueueCreated = true; if ( logger.isTraceEnabled() ) { logger.trace( "SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn ); @@ -213,10 +215,12 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { SubscribeRequest primarySubscribeRequest = new SubscribeRequest( primaryTopicArn, "sqs", primaryQueueArn ); sns.subscribe( primarySubscribeRequest ); - // ensure the SNS primary topic has permission to send to the primary SQS queue - List<String> primaryTopicArnList = new ArrayList<>(); - primaryTopicArnList.add( primaryTopicArn ); - AmazonNotificationUtils.setQueuePermissionsToReceive( sqs, queueUrl, primaryTopicArnList ); + if (localQueueCreated) { + // ensure the SNS primary topic has permission to send to the primary SQS queue + List<String> primaryTopicArnList = new ArrayList<>(); + primaryTopicArnList.add(primaryTopicArn); + AmazonNotificationUtils.setQueuePermissionsToReceive(sqs, queueUrl, primaryTopicArnList); + } } catch ( AmazonServiceException e ) { logger.error( @@ -235,9 +239,11 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { final Map<String, String> arrQueueArns = new HashMap<>( regionNames.length + 1 ); final Map<String, String> topicArns = new HashMap<>( regionNames.length + 1 ); + final Map<String, Boolean> queuesCreated = new HashMap<>(regionNames.length + 1); arrQueueArns.put(primaryQueueArn, fig.getPrimaryRegion()); topicArns.put(primaryTopicArn, fig.getPrimaryRegion()); + queuesCreated.put(fig.getPrimaryRegion(), localQueueCreated); for ( String regionName : regionNames ) { @@ -259,13 +265,19 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { topicArns.put( topicArn, regionName ); // create the SQS queue if it doesn't exist + boolean regionQueueCreated = false; String queueArn = AmazonNotificationUtils.getQueueArnByName( sqsClient, queueName ); if ( queueArn == null ) { queueUrl = AmazonNotificationUtils.createQueue( sqsClient, queueName, fig ); queueArn = AmazonNotificationUtils.getQueueArnByUrl( sqsClient, queueUrl ); + regionQueueCreated = true; + } else if (regionName.equals(fig.getPrimaryRegion()) && localQueueCreated) { + // created SQS queue earlier + regionQueueCreated = true; } arrQueueArns.put( queueArn, regionName ); + queuesCreated.put( regionName, regionQueueCreated); } if (logger.isTraceEnabled()) { @@ -321,9 +333,12 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { if (logger.isTraceEnabled()) { logger.trace("Adding permission to receive messages..."); } - // add permission to each queue, providing a list of topics that it's subscribed to - AmazonNotificationUtils - .setQueuePermissionsToReceive( subscribeSqsClient, subscribeQueueUrl, topicArnList ); + Boolean regionQueueCreated = queuesCreated.get(strSqsRegion); + if (regionQueueCreated != null && regionQueueCreated) { + // add permission to queue, providing a list of topics that it's subscribed to + AmazonNotificationUtils + .setQueuePermissionsToReceive(subscribeSqsClient, subscribeQueueUrl, topicArnList); + } } }