Repository: usergrid Updated Branches: refs/heads/hotfix_20171205 71169f89a -> 2b3573377
update SNS topic permissions for SQS queues when necessary Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2b357337 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2b357337 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2b357337 Branch: refs/heads/hotfix_20171205 Commit: 2b3573377a96eda1341fd61568195476744aabd0 Parents: 71169f8 Author: Mike Dunker <mdun...@google.com> Authored: Mon Dec 11 14:49:57 2017 -0800 Committer: Mike Dunker <mdun...@google.com> Committed: Mon Dec 11 14:49:57 2017 -0800 ---------------------------------------------------------------------- .../queue/util/AmazonNotificationUtils.java | 135 ++++++++++++++----- 1 file changed, 101 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/2b357337/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java index 56bef91..b2b209c 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java @@ -1,21 +1,16 @@ package org.apache.usergrid.persistence.queue.util; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import com.amazonaws.auth.policy.*; +import com.amazonaws.auth.policy.conditions.ArnCondition; +import com.amazonaws.services.sqs.model.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.queue.LegacyQueueFig; -import com.amazonaws.auth.policy.Condition; -import com.amazonaws.auth.policy.Policy; -import com.amazonaws.auth.policy.Principal; -import com.amazonaws.auth.policy.Resource; -import com.amazonaws.auth.policy.Statement; import com.amazonaws.auth.policy.actions.SQSActions; import com.amazonaws.auth.policy.conditions.ConditionFactory; import com.amazonaws.services.sns.AmazonSNSClient; @@ -23,13 +18,6 @@ import com.amazonaws.services.sns.model.CreateTopicResult; import com.amazonaws.services.sns.model.ListTopicsResult; import com.amazonaws.services.sns.model.Topic; import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.CreateQueueRequest; -import com.amazonaws.services.sqs.model.CreateQueueResult; -import com.amazonaws.services.sqs.model.GetQueueAttributesRequest; -import com.amazonaws.services.sqs.model.GetQueueAttributesResult; -import com.amazonaws.services.sqs.model.GetQueueUrlResult; -import com.amazonaws.services.sqs.model.QueueDoesNotExistException; -import com.amazonaws.services.sqs.model.SetQueueAttributesRequest; /** @@ -85,34 +73,113 @@ public class AmazonNotificationUtils { public static void setQueuePermissionsToReceive( final AmazonSQSClient sqs, final String queueUrl, final List<String> topicARNs ) throws Exception { - String queueARN = getQueueArnByUrl( sqs, queueUrl ); - - Statement statement = new Statement( Statement.Effect.Allow ).withActions( SQSActions.SendMessage ) - .withPrincipals( new Principal( "*" ) ) - .withResources( new Resource( queueARN ) ); + // retrieve queue ARN and policy + List<String> sqsAttrNames = Arrays.asList(QueueAttributeName.QueueArn.toString(), + QueueAttributeName.Policy.toString()); + GetQueueAttributesRequest getQueueAttributesRequest = + new GetQueueAttributesRequest( queueUrl ).withAttributeNames( sqsAttrNames ); + GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes( getQueueAttributesRequest ); + Map<String, String> sqsAttributeMap = queueAttributesResult.getAttributes(); + String queueARN = sqsAttributeMap.get(QueueAttributeName.QueueArn.toString()); + String policyJson = sqsAttributeMap.get(QueueAttributeName.Policy.toString()); + + // cannot send ARN in settings update, so remove it + sqsAttributeMap.remove(QueueAttributeName.QueueArn.toString()); + + // get existing policy from JSON + Policy policy = policyJson != null && policyJson.length() > 0 ? Policy.fromJson(policyJson) : new Policy(); + + // see if permissions already exist, and find ArnLike conditions + boolean matchingConditionFound = false; + boolean policyEdited = false; + for (Statement statement : policy.getStatements()) { + logger.info("statement id: {}, effect: {}, action: {}, resources:{}", + statement.getId(), statement.getEffect().name(), + statement.getActions().get(0).getActionName(), + statement.getResources().get(0).getId()); + + // must be Allow effect + if (! statement.getEffect().name().equals(Statement.Effect.Allow.name())) { + continue; + } - List<Condition> conditions = new ArrayList<>(); + // must be SendMessage action + boolean actionFound = false; + for (Action action : statement.getActions()) { + // do lower case comparison, since UI adds SQS.SendMessage but SDK uses sqs.SendMessage + if (action.getActionName().toLowerCase().equals(SQSActions.SendMessage.getActionName().toLowerCase())) { + actionFound = true; + break; + } + } + if (!actionFound) { + continue; + } - for ( String topicARN : topicARNs ) { + // must be same queue resource + boolean queueResourceFound = false; + for (Resource resource : statement.getResources()) { + if (resource.getId().equals(queueARN)) { + queueResourceFound = true; + break; + } + } + if (!queueResourceFound) { + continue; + } - conditions.add( ConditionFactory.newSourceArnCondition( topicARN ) ); + // found matching statement, check conditions for source ARN + for (Condition condition : statement.getConditions()) { + if (logger.isTraceEnabled()) { + logger.trace("condition type: {}, conditionKey: {}", condition.getType(), condition.getConditionKey()); + } + if (condition.getType().equals(ArnCondition.ArnComparisonType.ArnLike.name()) && + condition.getConditionKey().equals(ConditionFactory.SOURCE_ARN_CONDITION_KEY)) { + matchingConditionFound = true; + for (String topicARN : topicARNs) { + if (! condition.getValues().contains(topicARN)) { + // topic doesn't exist, add it + policyEdited = true; + condition.getValues().add(topicARN); + } + } + } + } } - statement.setConditions( conditions ); - Policy policy = new Policy( "SubscriptionPermission" ).withStatements( statement ); + if (!matchingConditionFound) { + // never found ArnLike SourceArn condition, need to add a statement + List<Condition> conditions = new ArrayList<>(); + for (String topicARN : topicARNs) { - final Map<String, String> queueAttributes = new HashMap<>(); - queueAttributes.put( "Policy", policy.toJson() ); + conditions.add(ConditionFactory.newSourceArnCondition(topicARN)); + } - SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest( queueUrl, queueAttributes ); + Statement statement = new Statement(Statement.Effect.Allow) + .withPrincipals(Principal.AllUsers) + .withActions(SQSActions.SendMessage) + .withResources(new Resource(queueARN)); + statement.setConditions(conditions); - try { - sqs.setQueueAttributes( queueAttributesRequest ); + policy.getStatements().add(statement); + policyEdited = true; } - catch ( Exception e ) { - logger.error( "Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN, - topicARNs.toString(), e ); + + if (policyEdited) { + sqsAttributeMap.put(QueueAttributeName.Policy.toString(), policy.toJson()); + + // log if permissions are being updated + logger.info("updating permissions for queueARN: {}, new policy: {}", queueARN, policy.toJson()); + + SetQueueAttributesRequest setQueueAttributesRequest = new SetQueueAttributesRequest(queueUrl, sqsAttributeMap); + + try { + sqs.setQueueAttributes(setQueueAttributesRequest); + } catch (Exception e) { + logger.error("Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN, + topicARNs.toString(), e); + } } }