Repository: usergrid
Updated Branches:
  refs/heads/hotfix_20171205 71169f89a -> 2b3573377


update SNS topic permissions for SQS queues when necessary


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2b357337
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2b357337
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2b357337

Branch: refs/heads/hotfix_20171205
Commit: 2b3573377a96eda1341fd61568195476744aabd0
Parents: 71169f8
Author: Mike Dunker <mdun...@google.com>
Authored: Mon Dec 11 14:49:57 2017 -0800
Committer: Mike Dunker <mdun...@google.com>
Committed: Mon Dec 11 14:49:57 2017 -0800

----------------------------------------------------------------------
 .../queue/util/AmazonNotificationUtils.java     | 135 ++++++++++++++-----
 1 file changed, 101 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2b357337/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
index 56bef91..b2b209c 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
@@ -1,21 +1,16 @@
 package org.apache.usergrid.persistence.queue.util;
 
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
+import com.amazonaws.auth.policy.*;
+import com.amazonaws.auth.policy.conditions.ArnCondition;
+import com.amazonaws.services.sqs.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.queue.LegacyQueueFig;
 
-import com.amazonaws.auth.policy.Condition;
-import com.amazonaws.auth.policy.Policy;
-import com.amazonaws.auth.policy.Principal;
-import com.amazonaws.auth.policy.Resource;
-import com.amazonaws.auth.policy.Statement;
 import com.amazonaws.auth.policy.actions.SQSActions;
 import com.amazonaws.auth.policy.conditions.ConditionFactory;
 import com.amazonaws.services.sns.AmazonSNSClient;
@@ -23,13 +18,6 @@ import com.amazonaws.services.sns.model.CreateTopicResult;
 import com.amazonaws.services.sns.model.ListTopicsResult;
 import com.amazonaws.services.sns.model.Topic;
 import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.CreateQueueRequest;
-import com.amazonaws.services.sqs.model.CreateQueueResult;
-import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
-import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
-import com.amazonaws.services.sqs.model.GetQueueUrlResult;
-import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
-import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
 
 
 /**
@@ -85,34 +73,113 @@ public class AmazonNotificationUtils {
     public static void setQueuePermissionsToReceive( final AmazonSQSClient 
sqs, final String queueUrl,
                                                      final List<String> 
topicARNs ) throws Exception {
 
-        String queueARN = getQueueArnByUrl( sqs, queueUrl );
-
-        Statement statement = new Statement( Statement.Effect.Allow 
).withActions( SQSActions.SendMessage )
-                                                                     
.withPrincipals( new Principal( "*" ) )
-                                                                     
.withResources( new Resource( queueARN ) );
+        // retrieve queue ARN and policy
+        List<String> sqsAttrNames = 
Arrays.asList(QueueAttributeName.QueueArn.toString(),
+            QueueAttributeName.Policy.toString());
+        GetQueueAttributesRequest getQueueAttributesRequest =
+            new GetQueueAttributesRequest( queueUrl ).withAttributeNames( 
sqsAttrNames );
+        GetQueueAttributesResult queueAttributesResult = 
sqs.getQueueAttributes( getQueueAttributesRequest );
+        Map<String, String> sqsAttributeMap = 
queueAttributesResult.getAttributes();
+        String queueARN = 
sqsAttributeMap.get(QueueAttributeName.QueueArn.toString());
+        String policyJson = 
sqsAttributeMap.get(QueueAttributeName.Policy.toString());
+
+        // cannot send ARN in settings update, so remove it
+        sqsAttributeMap.remove(QueueAttributeName.QueueArn.toString());
+
+        // get existing policy from JSON
+        Policy policy = policyJson != null && policyJson.length() > 0 ? 
Policy.fromJson(policyJson) : new Policy();
+
+        // see if permissions already exist, and find ArnLike conditions
+        boolean matchingConditionFound = false;
+        boolean policyEdited = false;
+        for (Statement statement : policy.getStatements()) {
+            logger.info("statement id: {}, effect: {}, action: {}, 
resources:{}",
+                statement.getId(), statement.getEffect().name(),
+                statement.getActions().get(0).getActionName(),
+                statement.getResources().get(0).getId());
+
+            // must be Allow effect
+            if (! 
statement.getEffect().name().equals(Statement.Effect.Allow.name())) {
+                continue;
+            }
 
-        List<Condition> conditions = new ArrayList<>();
+            // must be SendMessage action
+            boolean actionFound = false;
+            for (Action action : statement.getActions()) {
+                // do lower case comparison, since UI adds SQS.SendMessage but 
SDK uses sqs.SendMessage
+                if 
(action.getActionName().toLowerCase().equals(SQSActions.SendMessage.getActionName().toLowerCase()))
 {
+                    actionFound = true;
+                    break;
+                }
+            }
+            if (!actionFound) {
+                continue;
+            }
 
-        for ( String topicARN : topicARNs ) {
+            // must be same queue resource
+            boolean queueResourceFound = false;
+            for (Resource resource : statement.getResources()) {
+                if (resource.getId().equals(queueARN)) {
+                    queueResourceFound = true;
+                    break;
+                }
+            }
+            if (!queueResourceFound) {
+                continue;
+            }
 
-            conditions.add( ConditionFactory.newSourceArnCondition( topicARN ) 
);
+            // found matching statement, check conditions for source ARN
+            for (Condition condition : statement.getConditions()) {
+                if (logger.isTraceEnabled()) {
+                    logger.trace("condition type: {}, conditionKey: {}", 
condition.getType(), condition.getConditionKey());
+                }
+                if 
(condition.getType().equals(ArnCondition.ArnComparisonType.ArnLike.name()) &&
+                    
condition.getConditionKey().equals(ConditionFactory.SOURCE_ARN_CONDITION_KEY)) {
+                    matchingConditionFound = true;
+                    for (String topicARN : topicARNs) {
+                        if (! condition.getValues().contains(topicARN)) {
+                            // topic doesn't exist, add it
+                            policyEdited = true;
+                            condition.getValues().add(topicARN);
+                        }
+                    }
+                }
+            }
         }
-        statement.setConditions( conditions );
 
-        Policy policy = new Policy( "SubscriptionPermission" ).withStatements( 
statement );
+        if (!matchingConditionFound) {
+            // never found ArnLike SourceArn condition, need to add a statement
+            List<Condition> conditions = new ArrayList<>();
 
+            for (String topicARN : topicARNs) {
 
-        final Map<String, String> queueAttributes = new HashMap<>();
-        queueAttributes.put( "Policy", policy.toJson() );
+                
conditions.add(ConditionFactory.newSourceArnCondition(topicARN));
+            }
 
-        SetQueueAttributesRequest queueAttributesRequest = new 
SetQueueAttributesRequest( queueUrl, queueAttributes );
+            Statement statement = new Statement(Statement.Effect.Allow)
+                .withPrincipals(Principal.AllUsers)
+                .withActions(SQSActions.SendMessage)
+                .withResources(new Resource(queueARN));
+            statement.setConditions(conditions);
 
-        try {
-            sqs.setQueueAttributes( queueAttributesRequest );
+            policy.getStatements().add(statement);
+            policyEdited = true;
         }
-        catch ( Exception e ) {
-            logger.error( "Failed to set permissions on QUEUE ARN=[{}] for 
TOPIC ARNs=[{}]", queueARN,
-                topicARNs.toString(), e );
+
+        if (policyEdited) {
+            sqsAttributeMap.put(QueueAttributeName.Policy.toString(), 
policy.toJson());
+
+            // log if permissions are being updated
+            logger.info("updating permissions for queueARN: {}, new policy: 
{}", queueARN, policy.toJson());
+
+            SetQueueAttributesRequest setQueueAttributesRequest = new 
SetQueueAttributesRequest(queueUrl, sqsAttributeMap);
+
+            try {
+                sqs.setQueueAttributes(setQueueAttributesRequest);
+            } catch (Exception e) {
+                logger.error("Failed to set permissions on QUEUE ARN=[{}] for 
TOPIC ARNs=[{}]", queueARN,
+                    topicARNs.toString(), e);
+            }
         }
     }
 

Reply via email to