Github user shawnfeldman commented on a diff in the pull request: https://github.com/apache/incubator-usergrid/pull/255#discussion_r31171446 --- Diff: stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java --- @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. The ASF licenses this file to You + * under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. For additional information regarding + * copyright in this work, please see the NOTICE file in the top level + * directory of this distribution. + */ +package org.apache.usergrid.persistence.queue.impl; + + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.sns.AmazonSNSClient; +import com.amazonaws.services.sns.model.*; +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.*; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; +import org.apache.usergrid.persistence.queue.*; +import org.apache.usergrid.persistence.queue.Queue; +import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutionException; + +public class SNSQueueManagerImpl implements QueueManager { + + private static final Logger logger = LoggerFactory.getLogger(SNSQueueManagerImpl.class); + + private final QueueScope scope; + private ObjectMapper mapper; + private final QueueFig fig; + private final AmazonSQSClient sqs; + private final AmazonSNSClient sns; + + private static SmileFactory smileFactory = new SmileFactory(); + + private final LoadingCache<String, String> writeTopicArnMap = CacheBuilder.newBuilder() + .maximumSize(1000) + .build(new CacheLoader<String, String>() { + @Override + public String load(String queueName) + throws Exception { + + String primaryTopicArn = setupMultiRegion(queueName); + + return primaryTopicArn; + } + }); + + private final LoadingCache<String, Queue> readQueueUrlMap = CacheBuilder.newBuilder() + .maximumSize(1000) + .build(new CacheLoader<String, Queue>() { + @Override + public Queue load(String queueName) throws Exception { + + Queue queue = null; + + try { + GetQueueUrlResult result = sqs.getQueueUrl(queueName); + queue = new Queue(result.getQueueUrl()); + } catch (QueueDoesNotExistException queueDoesNotExistException) { + logger.error("Queue {} does not exist, creating", queueName); + } catch (Exception e) { + logger.error("failed to get queue from service", e); + throw e; + } + + if (queue == null) { + String primaryTopicArn = setupMultiRegion(queueName); + + String url = AmazonNotificationUtils.getQueueArnByName(queueName, sqs); + queue = new Queue(url); + } + + return queue; + } + }); + + + @Inject + public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig) { + this.scope = scope; + this.fig = fig; + + try { + smileFactory.delegateToTextual(true); + mapper = new ObjectMapper(smileFactory); + mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class"); + + sqs = createSQSClient(getRegion()); + sns = createSNSClient(getRegion()); + + } catch (Exception e) { + throw new RuntimeException("Error setting up mapper", e); + } + } + + private String setupMultiRegion(final String queueName) + throws Exception { + + String primaryTopicArn = AmazonNotificationUtils.getTopicArn(queueName, sns, true); + + String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName(queueName, sqs); + + if (primaryQueueArn == null) { + String queueUrl = AmazonNotificationUtils.createQueue(queueName, sqs, fig); + primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl(queueUrl, sqs); + } + + AmazonNotificationUtils.subscribeQueueToTopic(primaryTopicArn, primaryQueueArn, sns); + + if (fig.isMultiRegion()) { + + String multiRegion = fig.getRegionList(); + String[] regionNames = multiRegion.split(","); + + final Set<String> arrQueueArns = new HashSet<>(regionNames.length + 1); + final Map<String, AmazonSNSClient> topicArns = new HashMap<>(regionNames.length + 1); + + arrQueueArns.add(primaryQueueArn); + topicArns.put(primaryTopicArn, sns); + + for (String regionName : regionNames) { --- End diff -- change to observables if possible
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---