Add caching to queue manager so we don't create many instances of local or SNS queue manager impls for the same exact queue. Enhance the node.js integration tests.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/853d6486 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/853d6486 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/853d6486 Branch: refs/heads/asf-site Commit: 853d6486f82c51610bec6dc52d6fbde2cfe2da1a Parents: 9b8d1dc Author: Michael Russo <michaelaru...@gmail.com> Authored: Fri Feb 5 15:28:06 2016 -0800 Committer: Michael Russo <michaelaru...@gmail.com> Committed: Fri Feb 5 15:28:06 2016 -0800 ---------------------------------------------------------------------- .../persistence/entities/Notification.java | 2 +- .../usergrid/persistence/queue/QueueFig.java | 2 +- .../queue/impl/QueueManagerFactoryImpl.java | 51 ++- .../queue/impl/SNSQueueManagerImpl.java | 4 +- .../notifications/ApplicationQueueManager.java | 9 +- .../services/notifications/TaskManager.java | 68 ++-- .../impl/ApplicationQueueManagerImpl.java | 389 +++++++++---------- .../gcm/NotificationsServiceIT.java | 114 +++--- tests/integration/lib/entities.js | 2 +- tests/integration/lib/notifications.js | 1 + tests/integration/lib/notifiers.js | 43 ++ tests/integration/test/groups/groups.js | 55 +-- tests/integration/test/main.js | 5 +- .../test/notifications/notifications.js | 313 +++++++++++++++ tests/integration/test/teardown.js | 18 +- 15 files changed, 712 insertions(+), 364 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java index 34c7758..5c3ee89 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java @@ -90,7 +90,7 @@ public class Notification extends TypedEntity { @EntityProperty protected String priority; - /** Error messages that may have been encounted by Usergrid when trying to process the notification */ + /** Error messages that may have been encountered by Usergrid when trying to process the notification */ @EntityProperty protected String errorMessage; http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java index ad38f6d..cdab3e0 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java @@ -72,7 +72,7 @@ public interface QueueFig extends GuicyFig { // current msg size 1.2kb * 850000 = 1.02 GB (let this default be the most we'll queue in heap) @Key("usergrid.queue.publish.queuesize") - @Default("850000") + @Default("250000") int getAsyncQueueSize(); /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java index 0f78678..de9cac5 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java @@ -19,12 +19,18 @@ */ package org.apache.usergrid.persistence.queue.impl; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.inject.Inject; import com.google.inject.Singleton; import org.apache.usergrid.persistence.queue.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; /** * manages whether we take in an external in memory override for queues. @@ -32,10 +38,36 @@ import java.util.Map; @Singleton public class QueueManagerFactoryImpl implements QueueManagerFactory { + private static final Logger logger = LoggerFactory.getLogger( QueueManagerFactoryImpl.class ); private final QueueFig queueFig; private final QueueManagerInternalFactory queuemanagerInternalFactory; private final Map<String,QueueManager> defaultManager; + private final LoadingCache<QueueScope, QueueManager> queueManager = + CacheBuilder + .newBuilder() + .initialCapacity(5) + .maximumSize(100) + .build(new CacheLoader<QueueScope, QueueManager>() { + + @Override + public QueueManager load( QueueScope scope ) throws Exception { + + if ( queueFig.overrideQueueForDefault() ){ + + QueueManager manager = defaultManager.get( scope.getName() ); + if ( manager == null ) { + manager = new LocalQueueManager(); + defaultManager.put( scope.getName(), manager ); + } + return manager; + + } else { + return queuemanagerInternalFactory.getQueueManager(scope); + } + + } + }); @Inject public QueueManagerFactoryImpl(final QueueFig queueFig, final QueueManagerInternalFactory queuemanagerInternalFactory){ @@ -43,17 +75,18 @@ public class QueueManagerFactoryImpl implements QueueManagerFactory { this.queuemanagerInternalFactory = queuemanagerInternalFactory; this.defaultManager = new HashMap<>(10); } + @Override public QueueManager getQueueManager(QueueScope scope) { - if(queueFig.overrideQueueForDefault()){ - QueueManager manager = defaultManager.get(scope.getName()); - if(manager==null){ - manager = new LocalQueueManager(); - defaultManager.put(scope.getName(),manager); - } - return manager; - }else{ - return queuemanagerInternalFactory.getQueueManager(scope); + + try { + return queueManager.get(scope); + + } catch (ExecutionException e) { + + logger.error("Unable to load or retrieve queue manager from cache for queue {}", scope.getName()); + throw new RuntimeException(e); } + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index 3a1f045..8a503a5 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -564,7 +564,8 @@ public class SNSQueueManagerImpl implements QueueManager { logger.error( "SQS client is null, perhaps it failed to initialize successfully" ); return; } - + final long startSend = System.currentTimeMillis(); + logger.info("starting send message"); final String stringBody = toString( body ); String url = getReadQueue().getUrl(); @@ -575,6 +576,7 @@ public class SNSQueueManagerImpl implements QueueManager { SendMessageRequest request = new SendMessageRequest( url, stringBody ); + logger.info("now sending. time spent since starting to send in ms: {}", System.currentTimeMillis() - startSend); sqsAsync.sendMessageAsync( request, new AsyncHandler<SendMessageRequest, SendMessageResult>() { @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java index 6bbd117..3f0ca69 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java @@ -32,14 +32,15 @@ import java.util.List; */ public interface ApplicationQueueManager { - public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queue"; + String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queue"; - public static final String NOTIFIER_ID_POSTFIX = ".notifier.id"; + String NOTIFIER_ID_POSTFIX = ".notifier.id"; - public static final String DEFAULT_QUEUE_NAME = "push"; //keep this short as AWS limits queue name size to 80 chars + String DEFAULT_QUEUE_NAME = "push"; //keep this short as AWS limits queue name size to 80 chars /** * send notification to queue + * * @param notification * @param jobExecution * @throws Exception @@ -48,6 +49,7 @@ public interface ApplicationQueueManager { /** * send notifications to providers + * * @param messages * @param queuePath * @return @@ -61,6 +63,7 @@ public interface ApplicationQueueManager { /** * check for inactive devices, apple and google require this + * * @throws Exception */ void asyncCheckForInactiveDevices() throws Exception; http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java index 4f051e6..950447a 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java @@ -89,10 +89,10 @@ public class TaskManager { if (logger.isTraceEnabled()) { logger.trace("COUNT is: {}", successes.get()); } - if (hasFinished) { //process has finished but notifications are still coming in - finishedBatch(); - - } +// if (hasFinished) { //process has finished but notifications are still coming in +// finishedBatch(); +// +// } } } @@ -115,6 +115,7 @@ public class TaskManager { } } finally { completed(notifier, deviceUUID); + finishedBatch(); } } @@ -128,7 +129,7 @@ public class TaskManager { Receipt savedReceipt = em.create(receipt); receipt.setUuid(savedReceipt.getUuid()); List<EntityRef> entities = Arrays.asList(notification, device); -// em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, savedReceipt); + em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, savedReceipt); } else { em.update(receipt); } @@ -150,51 +151,34 @@ public class TaskManager { } } } + public void finishedBatch() throws Exception { - finishedBatch(true,false); + finishedBatch(true); } - public void finishedBatch(boolean fetch, boolean force) throws Exception { - - if (notification.getDebug() || getFailures() > 0 || force) { - long successes = this.successes.get(); //reset counters - long failures = this.failures.get(); //reset counters - - for (int i = 0; i < successes; i++) { - this.successes.decrementAndGet(); - } - for (int i = 0; i < failures; i++) { - this.failures.decrementAndGet(); - } - - this.hasFinished = true; + public void finishedBatch(boolean refreshNotification) throws Exception { - // refresh notification - if (fetch) - notification = em.get(this.notification.getUuid(), Notification.class); + long successes = this.successes.get(); //reset counters + long failures = this.failures.get(); //reset counters - //and write them out again, this will produce the most accurate count - Map<String, Long> stats = new HashMap<>(2); - stats.put("sent", successes); - stats.put("errors", failures); - notification.updateStatistics(successes, failures); + for (int i = 0; i < successes; i++) { + this.successes.decrementAndGet(); + } + for (int i = 0; i < failures; i++) { + this.failures.decrementAndGet(); + } - long totals = (notification.getStatistics().get("sent") + notification.getStatistics().get("errors")); - //none of this is known and should you ever do this - notification.setModified(System.currentTimeMillis()); - notification.setFinished(notification.getModified()); + this.hasFinished = true; - Map<String, Object> properties = new HashMap<>(); - properties.put("finished", notification.getModified()); - properties.put("state", notification.getState()); - notification.addProperties(properties); + // force refresh notification by fetching it + if (refreshNotification) { + notification = em.get(this.notification.getUuid(), Notification.class); + } - long latency = notification.getFinished() - notification.getStarted(); - logger.info("notification finished batch: {} of {} devices in {} ms", notification.getUuid(), totals, latency); + notification.updateStatistics(successes, failures); + notification.setModified(System.currentTimeMillis()); + notification.setFinished(notification.getModified()); - em.update(notification); -// Set<Notifier> notifiers = new HashSet<>(proxy.getAdapterMap().values()); // remove dups -// proxy.asyncCheckForInactiveDevices(notifiers); - } + em.update(notification); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index c956417..d0f8ca8 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -56,7 +56,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once - public ApplicationQueueManagerImpl(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){ + public ApplicationQueueManagerImpl(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties) { this.em = entityManager; this.qm = queueManager; this.jobScheduler = jobScheduler; @@ -67,13 +67,13 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { } - private boolean scheduleQueueJob(Notification notification) throws Exception{ + private boolean scheduleQueueJob(Notification notification) throws Exception { return jobScheduler.scheduleQueueJob(notification); } @Override public void queueNotification(final Notification notification, final JobExecution jobExecution) throws Exception { - if(scheduleQueueJob(notification)){ + if (scheduleQueueJob(notification)) { em.update(notification); return; } @@ -94,110 +94,97 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { logger.trace("notification {} start queuing", notification.getUuid()); } - final PathQuery<Device> pathQuery = notification.getPathQuery().buildPathQuery() ; //devices query + final PathQuery<Device> pathQuery = notification.getPathQuery().buildPathQuery(); //devices query final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching - final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues + final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<>(); //build up list of issues //get devices in querystring, and make sure you have access if (pathQuery != null) { - final HashMap<Object,ProviderAdapter> notifierMap = getAdapterMap(); + final HashMap<Object, ProviderAdapter> notifierMap = getAdapterMap(); if (logger.isTraceEnabled()) { logger.trace("notification {} start query", notification.getUuid()); } final Iterator<Device> iterator = pathQuery.iterator(em); + //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) { + if(logger.isTraceEnabled()){ + logger.trace("Scheduling notification job as it has multiple pages of devices."); + } jobScheduler.scheduleQueueJob(notification, true); em.update(notification); return; } final UUID appId = em.getApplication().getUuid(); - final Map<String,Object> payloads = notification.getPayloads(); - - final Func1<Entity,Entity> entityListFunct = entity -> { + final Map<String, Object> payloads = notification.getPayloads(); + final Func1<EntityRef, EntityRef> sendMessageFunction = deviceRef -> { try { long now = System.currentTimeMillis(); - List<EntityRef> devicesRef = getDevices(entity); // resolve group - if (logger.isTraceEnabled()) { - logger.trace("notification {} queue {} devices, duration {} ms", notification.getUuid(), devicesRef.size(), (System.currentTimeMillis() - now)); - } + String notifierId = null; + String notifierKey = null; - for (EntityRef deviceRef : devicesRef) { - if (logger.isTraceEnabled()) { - logger.trace("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid()); + //find the device notifier info, match it to the payload + for (Map.Entry<String, Object> entry : payloads.entrySet()) { + ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase()); + now = System.currentTimeMillis(); + String providerId = getProviderId(deviceRef, adapter.getNotifier()); + if (providerId != null) { + notifierId = providerId; + notifierKey = entry.getKey().toLowerCase(); + break; } - String notifierId = null; - String notifierKey = null; - - //find the device notifier info, match it to the payload - for (Map.Entry<String, Object> entry : payloads.entrySet()) { - ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase()); - now = System.currentTimeMillis(); - String providerId = getProviderId(deviceRef, adapter.getNotifier()); - if (providerId != null) { - notifierId = providerId; - notifierKey = entry.getKey().toLowerCase(); - break; - } - if (logger.isTraceEnabled()) { - logger.trace("Provider query for notification {} device {} took {} ms", notification.getUuid(), deviceRef.getUuid(), (System.currentTimeMillis() - now)); - } + if (logger.isTraceEnabled()) { + logger.trace("Provider query for notification {} device {} took {} ms", notification.getUuid(), deviceRef.getUuid(), (System.currentTimeMillis() - now)); } + } - if (notifierId == null) { - logger.info("Notifier did not match for device {} ", deviceRef); - continue; - } + if (notifierId == null) { + return deviceRef; + } + + ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId); + if (notification.getQueued() == null) { + + // update queued time + notification.setQueued(System.currentTimeMillis()); - ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId); - if (notification.getQueued() == null) { - // update queued time - now = System.currentTimeMillis(); - notification.setQueued(System.currentTimeMillis()); - if (logger.isTraceEnabled()) { - logger.trace("notification {} device {} queue time set. duration {} ms", notification.getUuid(), deviceRef.getUuid(), (System.currentTimeMillis() - now)); - } - } - now = System.currentTimeMillis(); - qm.sendMessage(message); - if (logger.isTraceEnabled()) { - logger.trace("notification {} post-queue to device {} duration {} ms, {} queue", notification.getUuid(), deviceRef.getUuid(), (System.currentTimeMillis() - now), queueName); - } - deviceCount.incrementAndGet(); - queueMeter.mark(); } + qm.sendMessage(message); + deviceCount.incrementAndGet(); + queueMeter.mark(); + + } catch (Exception deviceLoopException) { - logger.error("Failed to add devices", deviceLoopException); - errorMessages.add("Failed to add devices for entity: " + entity.getUuid() + " error:" + deviceLoopException); + logger.error("Failed to add device", deviceLoopException); + errorMessages.add("Failed to add device: " + deviceRef.getUuid() + ", error:" + deviceLoopException); } - return entity; + return deviceRef; }; - long now = System.currentTimeMillis(); //process up to 10 concurrently - Observable o = rx.Observable.create( new IteratorObservable<Entity>( iterator ) ) - .distinct( entity -> entity.getUuid() ) - .flatMap(entity -> - Observable.just(entity).map(entityListFunct) - .doOnError(throwable -> logger.error("Failed while writing", throwable)) , 10); + Observable processMessagesObservable = Observable.create(new IteratorObservable<Entity>(iterator)) + .flatMap(entity -> { + return Observable.from(getDevices(entity)); + }, 10) + .distinct(ref -> ref.getUuid()) + .map(sendMessageFunction) + .doOnError(throwable -> logger.error("Failed while trying to send notification", throwable)); + + processMessagesObservable.toBlocking().lastOrDefault(null); - o.toBlocking().lastOrDefault( null ); - if (logger.isTraceEnabled()) { - logger.trace("notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now); - } } // update queued time Map<String, Object> properties = new HashMap<>(2); properties.put("queued", notification.getQueued()); properties.put("state", notification.getState()); - if(errorMessages.size()>0){ + if (errorMessages.size() > 0) { if (notification.getErrorMessage() == null) { notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties"); } @@ -205,40 +192,33 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { notification.setExpectedCount(deviceCount.get()); notification.addProperties(properties); - long now = System.currentTimeMillis(); - + em.update(notification); - logger.info("notification {} updated notification duration {} ms", notification.getUuid(), System.currentTimeMillis() - now); - //do i have devices, and have i already started batching. - if (deviceCount.get() <= 0 || !notification.getDebug()) { + // if no devices, go ahead and mark the batch finished + if (deviceCount.get() <= 0 ) { TaskManager taskManager = new TaskManager(em, notification); - //if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests - taskManager.finishedBatch(false,true); - }else { - em.update(notification); + taskManager.finishedBatch(true); } - long elapsed = notification.getQueued() != null ? notification.getQueued() - startTime : 0; - if (logger.isTraceEnabled()) { - logger.trace("notification {} done queuing to {} devices in {} ms", notification.getUuid().toString(), deviceCount.get(), elapsed); - } + } /** * only need to get notifiers once. will reset on next batch + * * @return */ - private HashMap<Object,ProviderAdapter> getAdapterMap(){ - if(notifierHashMap == null) { + private HashMap<Object, ProviderAdapter> getAdapterMap() { + if (notifierHashMap == null) { long now = System.currentTimeMillis(); - notifierHashMap = new HashMap<Object, ProviderAdapter>(); + notifierHashMap = new HashMap<>(); Query query = new Query(); query.setCollection("notifiers"); query.setLimit(100); - PathQuery<Notifier> pathQuery = new PathQuery<Notifier>( - new SimpleEntityRef(em.getApplicationRef()), - query + PathQuery<Notifier> pathQuery = new PathQuery<>( + new SimpleEntityRef(em.getApplicationRef()), + query ); Iterator<Notifier> notifierIterator = pathQuery.iterator(em); int count = 0; @@ -246,22 +226,22 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { Notifier notifier = notifierIterator.next(); String name = notifier.getName() != null ? notifier.getName() : ""; UUID uuid = notifier.getUuid() != null ? notifier.getUuid() : UUID.randomUUID(); - ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier,em); + ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier, em); notifierHashMap.put(name.toLowerCase(), providerAdapter); notifierHashMap.put(uuid, providerAdapter); notifierHashMap.put(uuid.toString(), providerAdapter); - if(count++ >= 100){ + if (count++ >= 100) { logger.error("ApplicationQueueManager: too many notifiers...breaking out ", notifierHashMap.size()); break; } } - logger.info("ApplicationQueueManager: fetching notifiers finished size={}, duration {} ms", notifierHashMap.size(),System.currentTimeMillis() - now); } return notifierHashMap; } /** * send batches of notifications to provider + * * @param messages * @throws Exception */ @@ -273,128 +253,123 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { final Map<Object, ProviderAdapter> notifierMap = getAdapterMap(); final ApplicationQueueManagerImpl proxy = this; - final ConcurrentHashMap<UUID,TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size()); - final ConcurrentHashMap<UUID,Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size()); - - final Func1<QueueMessage, ApplicationQueueMessage> func = new Func1<QueueMessage, ApplicationQueueMessage>() { - @Override - public ApplicationQueueMessage call(QueueMessage queueMessage) { - boolean messageCommitted = false; - ApplicationQueueMessage message = null; - try { - message = (ApplicationQueueMessage) queueMessage.getBody(); - if (logger.isTraceEnabled()) { - logger.trace("start sending notification for device {} for Notification: {} on thread {}", message.getDeviceId(), message.getNotificationId(), Thread.currentThread().getId()); - } + final ConcurrentHashMap<UUID, TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size()); + final ConcurrentHashMap<UUID, Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size()); - UUID deviceUUID = message.getDeviceId(); + final Func1<QueueMessage, ApplicationQueueMessage> func = queueMessage -> { + boolean messageCommitted = false; + ApplicationQueueMessage message = null; + try { + message = (ApplicationQueueMessage) queueMessage.getBody(); + if (logger.isTraceEnabled()) { + logger.trace("start sending notification for device {} for Notification: {} on thread {}", message.getDeviceId(), message.getNotificationId(), Thread.currentThread().getId()); + } - Notification notification = notificationMap.get(message.getNotificationId()); - if (notification == null) { - notification = em.get(message.getNotificationId(), Notification.class); - notificationMap.put(message.getNotificationId(), notification); - } - TaskManager taskManager = taskMap.get(message.getNotificationId()); - if (taskManager == null) { - taskManager = new TaskManager(em, notification); - taskMap.putIfAbsent(message.getNotificationId(), taskManager); - taskManager = taskMap.get(message.getNotificationId()); - } + UUID deviceUUID = message.getDeviceId(); - final Map<String, Object> payloads = notification.getPayloads(); - final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap); - if (logger.isTraceEnabled()) { - logger.trace("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid()); - } + Notification notification = notificationMap.get(message.getNotificationId()); + if (notification == null) { + notification = em.get(message.getNotificationId(), Notification.class); + notificationMap.put(message.getNotificationId(), notification); + } + TaskManager taskManager = taskMap.get(message.getNotificationId()); + if (taskManager == null) { + taskManager = new TaskManager(em, notification); + taskMap.putIfAbsent(message.getNotificationId(), taskManager); + taskManager = taskMap.get(message.getNotificationId()); + } - try { - String notifierName = message.getNotifierKey().toLowerCase(); - ProviderAdapter providerAdapter = notifierMap.get(notifierName.toLowerCase()); - Object payload = translatedPayloads.get(notifierName); - Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID); - TaskTracker tracker = new TaskTracker(providerAdapter.getNotifier(), taskManager, receipt, deviceUUID); - if(!isOkToSend(notification)){ - tracker.failed(0, "Notification is duplicate/expired/cancelled."); - }else { - if (payload == null) { - if (logger.isDebugEnabled()) { - logger.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid()); - } - tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier"); - } else { - long now = System.currentTimeMillis(); - try { - providerAdapter.sendNotification(message.getNotifierId(), payload, notification, tracker); - } catch (Exception e) { - tracker.failed(0, e.getMessage()); - } finally { - if (logger.isTraceEnabled()) { - logger.trace("sending to device {} for Notification: {} duration {} ms", deviceUUID, notification.getUuid(), (System.currentTimeMillis() - now)); - } + final Map<String, Object> payloads = notification.getPayloads(); + final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap); + if (logger.isTraceEnabled()) { + logger.trace("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid()); + } + + try { + String notifierName = message.getNotifierKey().toLowerCase(); + ProviderAdapter providerAdapter = notifierMap.get(notifierName.toLowerCase()); + Object payload = translatedPayloads.get(notifierName); + Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID); + TaskTracker tracker = new TaskTracker(providerAdapter.getNotifier(), taskManager, receipt, deviceUUID); + if (!isOkToSend(notification)) { + tracker.failed(0, "Notification is duplicate/expired/cancelled."); + } else { + if (payload == null) { + if (logger.isDebugEnabled()) { + logger.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid()); + } + tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier"); + } else { + long now = System.currentTimeMillis(); + try { + providerAdapter.sendNotification(message.getNotifierId(), payload, notification, tracker); + } catch (Exception e) { + tracker.failed(0, e.getMessage()); + } finally { + if (logger.isTraceEnabled()) { + logger.trace("sending to device {} for Notification: {} duration {} ms", deviceUUID, notification.getUuid(), (System.currentTimeMillis() - now)); } } } - messageCommitted = true; - } finally { - sendMeter.mark(); } + messageCommitted = true; + } finally { + sendMeter.mark(); + } - } catch (Exception e) { - logger.error("Failure while sending",e); - try { - if(!messageCommitted && queuePath != null) { - qm.commitMessage(queueMessage); - } - }catch (Exception queueException){ - logger.error("Failed to commit message.",queueException); + } catch (Exception e) { + logger.error("Failure while sending", e); + try { + if (!messageCommitted && queuePath != null) { + qm.commitMessage(queueMessage); } + } catch (Exception queueException) { + logger.error("Failed to commit message.", queueException); } - return message; } + return message; }; //from each queue message, process them in parallel up to 10 at a time - Observable o = rx.Observable.from( messages ).flatMap( queueMessage -> { + Observable queueMessageObservable = Observable.from(messages).flatMap(queueMessage -> { - return Observable.just( queueMessage ).map( func ).buffer( messages.size() ).map( queueMessages -> { + return Observable.just(queueMessage).map(func).buffer(messages.size()).map(queueMessages -> { //for gcm this will actually send notification - for ( ProviderAdapter providerAdapter : notifierMap.values() ) { + for (ProviderAdapter providerAdapter : notifierMap.values()) { try { providerAdapter.doneSendingNotifications(); - } - catch ( Exception e ) { - logger.error( "providerAdapter.doneSendingNotifications: ", e ); + } catch (Exception e) { + logger.error("providerAdapter.doneSendingNotifications: ", e); } } //TODO: check if a notification is done and mark it HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<>(); - for ( ApplicationQueueMessage message : queueMessages ) { - if ( notifications.get( message.getNotificationId() ) == null ) { + for (ApplicationQueueMessage message : queueMessages) { + if (notifications.get(message.getNotificationId()) == null) { try { - TaskManager taskManager = taskMap.get( message.getNotificationId() ); - notifications.put( message.getNotificationId(), message ); + TaskManager taskManager = taskMap.get(message.getNotificationId()); + notifications.put(message.getNotificationId(), message); taskManager.finishedBatch(); - } - catch ( Exception e ) { - logger.error( "Failed to finish batch", e ); + } catch (Exception e) { + logger.error("Failed to finish batch", e); } } } return notifications; - } ).doOnError( throwable -> logger.error( "Failed while sending", throwable ) ); - }, 10 ); + }).doOnError(throwable -> logger.error("Failed while sending", throwable)); + }, 10); - return o; + return queueMessageObservable; } @Override - public void stop(){ - for(ProviderAdapter adapter : getAdapterMap().values()){ + public void stop() { + for (ProviderAdapter adapter : getAdapterMap().values()) { try { adapter.stop(); - }catch (Exception e){ - logger.error("failed to stop adapter",e); + } catch (Exception e) { + logger.error("failed to stop adapter", e); } } } @@ -407,7 +382,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { */ private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter> notifierMap) throws Exception { - Map<String, Object> translatedPayloads = new HashMap<String, Object>( payloads.size()); + Map<String, Object> translatedPayloads = new HashMap<String, Object>(payloads.size()); for (Map.Entry<String, Object> entry : payloads.entrySet()) { String payloadKey = entry.getKey().toLowerCase(); Object payloadValue = entry.getValue(); @@ -431,10 +406,13 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> { private final Iterator<T> input; - private IteratorObservable( final Iterator input ) {this.input = input;} + + private IteratorObservable(final Iterator input) { + this.input = input; + } @Override - public void call( final Subscriber<? super T> subscriber ) { + public void call(final Subscriber<? super T> subscriber) { /** * You would replace this code with your file reading. Instead of emitting from an iterator, @@ -442,17 +420,16 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { */ try { - while ( !subscriber.isUnsubscribed() && input.hasNext() ) { + while (!subscriber.isUnsubscribed() && input.hasNext()) { //send our input to the next - subscriber.onNext( (T) input.next() ); + subscriber.onNext((T) input.next()); } //tell the subscriber we don't have any more data subscriber.onCompleted(); - } - catch ( Throwable t ) { - logger.error("failed on subscriber",t); - subscriber.onError( t ); + } catch (Throwable t) { + logger.error("failed on subscriber", t); + subscriber.onError(t); } } } @@ -483,45 +460,61 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { private boolean isOkToSend(Notification notification) { - Map<String,Long> stats = notification.getStatistics(); - if (stats != null && notification.getExpectedCount() == (stats.get("sent")+ stats.get("errors"))) { + Map<String, Long> stats = notification.getStatistics(); + if (stats != null && notification.getExpectedCount() == (stats.get("sent") + stats.get("errors"))) { if (logger.isDebugEnabled()) { logger.debug("notification {} already processed. not sending.", - notification.getUuid()); + notification.getUuid()); } return false; } if (notification.getCanceled() == Boolean.TRUE) { if (logger.isDebugEnabled()) { logger.debug("notification {} canceled. not sending.", - notification.getUuid()); + notification.getUuid()); } return false; } if (notification.isExpired()) { if (logger.isDebugEnabled()) { logger.debug("notification {} expired. not sending.", - notification.getUuid()); + notification.getUuid()); } return false; } return true; } - private List<EntityRef> getDevices(EntityRef ref) throws Exception { + private List<EntityRef> getDevices(EntityRef ref) { + List<EntityRef> devices = Collections.EMPTY_LIST; - if ("device".equals(ref.getType())) { - devices = Collections.singletonList(ref); - } else if ("user".equals(ref.getType())) { - devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT, + + try { + if ("device".equals(ref.getType())) { + devices = Collections.singletonList(ref); + } else if ("user".equals(ref.getType())) { + devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT, Query.Level.REFS, false).getRefs(); - } else if ("group".equals(ref.getType())) { - devices = new ArrayList<EntityRef>(); - for (EntityRef r : em.getCollection(ref, "users", null, + } else if ("group".equals(ref.getType())) { + devices = new ArrayList<>(); + for (EntityRef r : em.getCollection(ref, "users", null, Query.MAX_LIMIT, Query.Level.REFS, false).getRefs()) { - devices.addAll(getDevices(r)); + devices.addAll(getDevices(r)); + } + } + } catch (Exception e) { + + if (ref != null){ + logger.error("Error while retrieving devices for entity type {} and uuid {}. Error: {}", + ref.getType(), ref.getUuid(), e); + }else{ + logger.error("Error while retrieving devices. Entity ref was null."); } + + throw new RuntimeException("Unable to retrieve devices for EntityRef", e); + } + return devices; } @@ -534,7 +527,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { } return value != null ? value.toString() : null; } catch (Exception e) { - logger.error("Errer getting provider ID, proceding with rest of batch", e); + logger.error("Error getting provider ID, proceeding with rest of batch", e); return null; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java ---------------------------------------------------------------------- diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java index 97513be..65cc54a 100644 --- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java +++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java @@ -39,16 +39,18 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { private static final Logger logger = LoggerFactory - .getLogger(NotificationsServiceIT.class); + .getLogger(NotificationsServiceIT.class); - /** set to true to use actual connections to GCM servers */ + /** + * set to true to use actual connections to GCM servers + */ private static final boolean USE_REAL_CONNECTIONS = true; private static final String PROVIDER = USE_REAL_CONNECTIONS ? "google" : "noop"; private static final String API_KEY = "AIzaSyCIH_7WC0mOqBGMOXyQnFgrBpOePgHvQJM"; private static final String PUSH_TOKEN = "APA91bGxRGnMK8tKgVPzSlxtCFvwSVqx0xEPjA06sBmiK0k" - + "QsiwUt6ipSYF0iPRHyUgpXle0P8OlRWJADkQrcN7yxG4pLMg1CVmrqDu8tfSe63mZ-MRU2IW0cOhmo" - + "sqzC9trl33moS3OvT7qjDjkP4Qq8LYdwwYC5A"; + + "QsiwUt6ipSYF0iPRHyUgpXle0P8OlRWJADkQrcN7yxG4pLMg1CVmrqDu8tfSe63mZ-MRU2IW0cOhmo" + + "sqzC9trl33moS3OvT7qjDjkP4Qq8LYdwwYC5A"; private Notifier notifier; private Device device1, device2; @@ -56,14 +58,12 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { private QueueListener listener; - - - @BeforeClass - public static void setup(){ + public static void setup() { } + @Before public void before() throws Exception { @@ -77,8 +77,8 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { app.put("apiKey", API_KEY); notifier = (Notifier) app - .testRequest(ServiceAction.POST, 1, "notifiers").getEntity() - .toTypedEntity(); + .testRequest(ServiceAction.POST, 1, "notifiers").getEntity() + .toTypedEntity(); String key = notifier.getName() + NOTIFIER_ID_POSTFIX; // create devices // @@ -86,7 +86,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { app.clear(); app.put(key, PUSH_TOKEN); - Entity e = app.testRequest(ServiceAction.POST, 1, "devices") .getEntity(); + Entity e = app.testRequest(ServiceAction.POST, 1, "devices").getEntity(); app.testRequest(ServiceAction.GET, 1, "devices", e.getUuid()); device1 = app.getEntityManager().get(e.getUuid(), Device.class); @@ -103,8 +103,8 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { } @After - public void after(){ - if(listener!=null) { + public void after() { + if (listener != null) { listener.stop(); listener = null; } @@ -119,23 +119,23 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { app.put("environment", "development"); app.put("apiKey", API_KEY); Notifier n = (Notifier) app - .testRequest(ServiceAction.POST, 1, "notifiers").getEntity() - .toTypedEntity(); + .testRequest(ServiceAction.POST, 1, "notifiers").getEntity() + .toTypedEntity(); app.clear(); String payload = "Hello, World!"; Map<String, String> payloads = new HashMap<String, String>(1); payloads.put("foo", payload); app.put("payloads", payloads); - app.put("debug",true); + app.put("debug", true); app.put("queued", System.currentTimeMillis()); - Entity e = app.testRequest(ServiceAction.POST, 1,"devices",device1.getUuid(), "notifications") - .getEntity(); + Entity e = app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications") + .getEntity(); app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid()); Notification notification = app.getEntityManager().get(e.getUuid(), - Notification.class); + Notification.class); // perform push // notification = notificationWaitForComplete(notification); @@ -151,16 +151,16 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { payloads.put(notifier.getUuid().toString(), payload); app.put("payloads", payloads); app.put("queued", System.currentTimeMillis()); - app.put("debug",true); + app.put("debug", true); app.put("expire", System.currentTimeMillis() + 300000); // add 5 minutes to current time - Entity e = app.testRequest(ServiceAction.POST, 1, "devices",device1.getUuid(),"notifications").getEntity(); + Entity e = app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications").getEntity(); app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid()); Notification notification = app.getEntityManager().get(e.getUuid(), Notification.class); assertEquals( - notification.getPayloads().get(notifier.getUuid().toString()), - payload); + notification.getPayloads().get(notifier.getUuid().toString()), + payload); // perform push // notification = notificationWaitForComplete(notification); @@ -177,11 +177,11 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { payloads.put(notifier.getUuid().toString(), payload); app.put("payloads", payloads); app.put("queued", System.currentTimeMillis()); - app.put("debug",true); + app.put("debug", true); app.put("expire", System.currentTimeMillis() + 300000); // add 5 minutes to current time app.put("priority", "high"); - Entity e = app.testRequest(ServiceAction.POST, 1, "devices",device1.getUuid(),"notifications").getEntity(); + Entity e = app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications").getEntity(); app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid()); Notification notification = app.getEntityManager().get(e.getUuid(), Notification.class); @@ -204,11 +204,11 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { payloads.put(notifier.getUuid().toString(), payload); app.put("payloads", payloads); app.put("queued", System.currentTimeMillis()); - app.put("debug",true); + app.put("debug", true); app.put("expire", System.currentTimeMillis() + 300000); // add 5 minutes to current time app.put("priority", "not_a_priority"); - Entity e = app.testRequest(ServiceAction.POST, 1, "devices",device1.getUuid(),"notifications").getEntity(); + Entity e = app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications").getEntity(); app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid()); Notification notification = app.getEntityManager().get(e.getUuid(), Notification.class); @@ -232,10 +232,10 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { payloads.put(notifier.getUuid().toString(), payload); app.put("payloads", payloads); app.put("queued", System.currentTimeMillis()); - app.put("debug",true); + app.put("debug", true); app.put("expire", System.currentTimeMillis() + 300000); // add 5 minutes to current time - Entity e = app.testRequest(ServiceAction.POST, 1, "devices",device1.getUuid(),"notifications").getEntity(); + Entity e = app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications").getEntity(); app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid()); Notification notification = app.getEntityManager().get(e.getUuid(), Notification.class); @@ -257,10 +257,10 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { payloads.put(notifier.getUuid().toString(), payload); app.put("payloads", payloads); app.put("queued", System.currentTimeMillis()); - app.put("debug",true); + app.put("debug", true); app.put("expire", System.currentTimeMillis() + 300000); // add 5 minutes to current time - Entity e = app.testRequest(ServiceAction.POST, 1, "devices","*","notifications").getEntity(); + Entity e = app.testRequest(ServiceAction.POST, 1, "devices", "*", "notifications").getEntity(); app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid()); Notification notification = app.getEntityManager().get(e.getUuid(), Notification.class); @@ -285,7 +285,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { assertNotNull(user); // post an existing device to user's devices collection - Entity device = app.testRequest(ServiceAction.POST, 1, "users", user.getUuid(), "devices", device1.getUuid()).getEntity(); + Entity device = app.testRequest(ServiceAction.POST, 1, "users", user.getUuid(), "devices", device1.getUuid()).getEntity(); assertEquals(device.getUuid(), device1.getUuid()); // create and post notification @@ -294,8 +294,8 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { payloads.put(notifier.getUuid().toString(), payload); app.put("payloads", payloads); app.put("queued", System.currentTimeMillis()); - app.put("debug",true); - Entity e = app.testRequest(ServiceAction.POST, 1,"users",user.getUuid(), "notifications").getEntity(); + app.put("debug", true); + Entity e = app.testRequest(ServiceAction.POST, 1, "users", user.getUuid(), "notifications").getEntity(); app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid()); @@ -315,15 +315,15 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { payloads.put(notifier.getUuid().toString(), payload); app.put("payloads", payloads); app.put("queued", System.currentTimeMillis()); - app.put("debug",true); + app.put("debug", true); - Entity e = app.testRequest(ServiceAction.POST, 1, "devices","notifications") .getEntity(); + Entity e = app.testRequest(ServiceAction.POST, 1, "devices", "notifications").getEntity(); app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid()); - Notification notification = app.getEntityManager().get(e.getUuid(), Notification.class); + Notification notification = app.getEntityManager().get(e.getUuid(), Notification.class); assertEquals( - notification.getPayloads().get(notifier.getUuid().toString()), - payload); + notification.getPayloads().get(notifier.getUuid().toString()), + payload); // reduce Batch size to 1 Field field = GCMAdapter.class.getDeclaredField("BATCH_SIZE"); @@ -349,10 +349,10 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { app.clear(); app.put("payloads", "{asdf}"); - app.put("debug",true); + app.put("debug", true); try { - app.testRequest(ServiceAction.POST, 1,"devices",device1.getUuid(), "notifications"); + app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications"); fail("invalid payload should have been rejected"); } catch (IllegalArgumentException ex) { // ok @@ -364,7 +364,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { app.put("payloads", payloads); payloads.put("xxx", ""); try { - app.testRequest(ServiceAction.POST, 1,"devices",device1.getUuid(), "notifications"); + app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications"); fail("invalid payload should have been rejected"); } catch (IllegalArgumentException ex) { // ok @@ -379,7 +379,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { app.put("environment", "development"); app.put("apiKey", API_KEY); Entity e = app.testRequest(ServiceAction.POST, 1, "notifiers") - .getEntity(); + .getEntity(); Notifier notifier2 = app.getEntityManager().get(e.getUuid(), Notifier.class); payloads.clear(); @@ -393,14 +393,14 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { app.clear(); app.put("payloads", payloads); - app.put("debug",true); + app.put("debug", true); try { - app.testRequest(ServiceAction.POST, 1, "devices",device1.getUuid(),"notifications"); + app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications"); fail("invalid payload should have been rejected"); } catch (Exception ex) { assertEquals("java.lang.IllegalArgumentException: GCM payloads must be 4096 characters or less", - ex.getMessage()); + ex.getMessage()); // ok } } @@ -420,10 +420,10 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { payloads.put(notifier.getUuid().toString(), payload); app.put("payloads", payloads); app.put("queued", System.currentTimeMillis()); - app.put("debug",true); + app.put("debug", true); // create push notification - Entity e = app.testRequest(ServiceAction.POST, 1, "devices",badDevice.getUuid(),"notifications") + Entity e = app.testRequest(ServiceAction.POST, 1, "devices", badDevice.getUuid(), "notifications") .getEntity(); // validate notification was created successfully @@ -448,7 +448,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { @Test public void createGoogleNotifierWithBadAPIKey() throws Exception { - final String badKey = API_KEY+"bad"; + final String badKey = API_KEY + "bad"; // create notifier with bad API key app.clear(); @@ -457,25 +457,25 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { app.put("environment", "development"); app.put("apiKey", badKey); - try{ + try { notifier = (Notifier) app .testRequest(ServiceAction.POST, 1, "notifiers").getEntity() .toTypedEntity(); - }catch(InvalidRequestException e){ + } catch (InvalidRequestException e) { assertEquals(Constants.ERROR_INVALID_REGISTRATION, e.getDescription()); } } @Test - public void sendNotificationWithBadAPIKey() throws Exception{ - final String badKey = API_KEY+"bad"; + public void sendNotificationWithBadAPIKey() throws Exception { + final String badKey = API_KEY + "bad"; // update an existing notifier with a bad API key app.clear(); app.put("apiKey", badKey); notifier = (Notifier) app - .testRequest(ServiceAction.PUT, 1, "notifiers",notifier.getUuid()).getEntity() + .testRequest(ServiceAction.PUT, 1, "notifiers", notifier.getUuid()).getEntity() .toTypedEntity(); // create notification payload @@ -485,11 +485,11 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { payloads.put(notifier.getUuid().toString(), payload); app.put("payloads", payloads); app.put("queued", System.currentTimeMillis()); - app.put("debug",true); + app.put("debug", true); // create notification - Entity e = app.testRequest(ServiceAction.POST, 1,"devices",device1.getUuid(), "notifications") - .getEntity(); + Entity e = app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications") + .getEntity(); // validate notification was created successfully http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/lib/entities.js ---------------------------------------------------------------------- diff --git a/tests/integration/lib/entities.js b/tests/integration/lib/entities.js index b17fc74..e941d6f 100644 --- a/tests/integration/lib/entities.js +++ b/tests/integration/lib/entities.js @@ -117,7 +117,7 @@ function deleteAllEntities(collection, cb) { deleteAllEntities(collection, function(e) { cb(e); }); - }, 600); // Mandatory, since it seems to not retrieve entities if you make a request in < 600ms + }, 100); // add some delay }); } else { cb(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/lib/notifications.js ---------------------------------------------------------------------- diff --git a/tests/integration/lib/notifications.js b/tests/integration/lib/notifications.js index dd04864..f046bfd 100644 --- a/tests/integration/lib/notifications.js +++ b/tests/integration/lib/notifications.js @@ -28,6 +28,7 @@ module.exports.send = function(path, payload, cb) { url: urls.appendOrgCredentials(urls.getAppUrl() + path + "/notifications"), json: payload }, function(err, response, body) { + //console.log(JSON.stringify(body, null, 2)); var error = responseLib.getError(err, response); cb(error, error ? null : body.entities.pop()); }); http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/lib/notifiers.js ---------------------------------------------------------------------- diff --git a/tests/integration/lib/notifiers.js b/tests/integration/lib/notifiers.js new file mode 100644 index 0000000..33f46cb --- /dev/null +++ b/tests/integration/lib/notifiers.js @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +var request = require("request"); +var urls = require("./urls"); +var responseLib = require("./response"); +module.exports = {}; + + +module.exports.add = function(notifier, cb) { + request.put({ + url: urls.appendOrgCredentials(urls.getAppUrl() + "notifiers/" + notifier.name), + json: notifier + }, function(err, response, body) { + var error = responseLib.getError(err, response); + cb(error, error ? null : body.entities.pop()); + }); +}; + + +module.exports.get = function(notifierUUID, cb) { + request.get(urls.appendOrgCredentials(urls.getAppUrl() + "notifiers/" + notifierUUID), function(err, response, body) { + var json = JSON.parse(body); + var error = response.statusCode === 404 ? null : responseLib.getError(err, response); + cb(error, error ? null : response.statusCode === 404 ? null : json.entities.pop()); + }) +}; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/test/groups/groups.js ---------------------------------------------------------------------- diff --git a/tests/integration/test/groups/groups.js b/tests/integration/test/groups/groups.js index b822661..b56a11c 100644 --- a/tests/integration/test/groups/groups.js +++ b/tests/integration/test/groups/groups.js @@ -1,3 +1,6 @@ +/** + * Created by russo on 2/4/16. + */ /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -34,7 +37,7 @@ module.exports = { var username = "groupuser"; var password = "password"; var usersArray = []; - for (var i = 1; i <= 5; i++) { + for (var i = 0; i < 5; i++) { usersArray.push({ "username": username + "-" + i, "password": password, @@ -46,7 +49,7 @@ module.exports = { // build devices var name = "device"; var devicesArray = []; - for (var j = 1; j <= 5; j++) { + for (var j = 0; j < 5; j++) { devicesArray.push({ "name": name + "-" + j, "gcm.notifier.id": DEVICE_TOKEN @@ -55,7 +58,7 @@ module.exports = { describe("users", function () { - it("should create some devices", function (done) { + it("should create some users", function (done) { this.slow(2000); async.each(usersArray, function (user, cb) { users.add(user, function (err, user) { @@ -74,51 +77,6 @@ module.exports = { }); - describe("devices", function () { - it("should create some devices", function (done) { - this.slow(2000); - async.each(devicesArray, function (device, cb) { - devices.add(device, function (err, device) { - should(err).be.null; - device.should.not.be.null; - cb(err, device); - }); - - }, function (err) { - - done() - - }); - - }) - - }); - - - describe("user<->devices", function () { - it("should connect devices to users", function (done) { - this.slow(2000); - async.eachSeries(usersArray, function (user, cb) { - async.each(devicesArray, function (device, cb) { - connections.connect("users", user.username, "devices", device.name, null, function (err) { - cb(err, device); - }); - }); - cb(null); - - }, function (err) { - - if (err) { - console.log("error adding users " + err); - } - done(); - }); - - }) - - }); - - describe("groups", function () { it("should create some groups", function (done) { this.slow(2000); @@ -130,7 +88,6 @@ module.exports = { path: "group2" }; - console.log(" creating some groups"); groups.add(group1, function (err) { if (err) { console.log("failed to create " + "group1:" + err); http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/test/main.js ---------------------------------------------------------------------- diff --git a/tests/integration/test/main.js b/tests/integration/test/main.js index 5833607..4d8341c 100644 --- a/tests/integration/test/main.js +++ b/tests/integration/test/main.js @@ -57,8 +57,11 @@ describe("** Usergrid REST Integration Tests **", function() { describe("groups", function() { require("./groups/groups.js").test(); }); + describe("notifications", function() { + require("./notifications/notifications.js").test(); + }); after(function(done) { - this.timeout(40000); + this.timeout(180000); console.log(" teardown"); teardown.do(function(err) { should(err).be.null; http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/test/notifications/notifications.js ---------------------------------------------------------------------- diff --git a/tests/integration/test/notifications/notifications.js b/tests/integration/test/notifications/notifications.js new file mode 100644 index 0000000..07e7642 --- /dev/null +++ b/tests/integration/test/notifications/notifications.js @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +var should = require("should"); +var uuid = require("uuid"); +var users = require("../../lib/users"); +var devices = require("../../lib/devices"); +var groups = require("../../lib/groups"); +var notifiers = require("../../lib/notifiers"); +var notifications = require("../../lib/notifications"); +var connections = require("../../lib/connections"); +var async = require('async'); + +var GOOGLE_API_KEY = "AIzaSyCIH_7WC0mOqBGMOXyQnFgrBpOePgHvQJM"; +var ANDROID_DEVICE_TOKEN = "APA91bGxRGnMK8tKgVPzSlxtCFvwSVqx0xEPjA06sBmiK0kQsiwUt6ipSYF0iPRHyUgpX" + + "le0P8OlRWJADkQrcN7yxG4pLMg1CVmrqDu8tfSe63mZ-MRU2IW0cOhmosqzC9trl33moS3OvT7qjDjkP4Qq8LYdwwYC5A"; + +module.exports = { + test: function () { + + var username = "notificationuser"; + var password = "password"; + var usersArray = []; + for (var i = 0; i < 5; i++) { + usersArray.push({ + "username": username + "-" + i, + "password": password, + "name": username + "-" + i, + "email": username + "-" + i + "@uge2e.com", + "number": i + }); + } + + // build devices + var deviceName = "notificationdevice"; + var devicesArray = []; + for (var j = 0; j < 5; j++) { + devicesArray.push({ + "name": deviceName + "-" + j, + "gcm.notifier.id": ANDROID_DEVICE_TOKEN, + "number": i + }); + } + + var notifiersArray = []; + var notifier = { + name: "gcm", + provider: "google", + environment: "environment", + apiKey: GOOGLE_API_KEY + }; + notifiersArray.push(notifier); + + + var gcmNotification = { + + payloads: { + gcm: "Usergrid Integration Push Test - GCM" + } + }; + + + describe("notifiers -> GCM", function () { + it("should create a GCM notifier", function (done) { + this.slow(5000); + async.each(notifiersArray, function (notifier, cb) { + notifiers.add(notifier, function (err, notifier) { + should(err).be.null; + notifier.should.not.be.null; + cb(err, notifier); + }); + }, function (err) { + + done(); + + }); + + }) + + }); + + describe("users", function () { + it("should create some users", function (done) { + this.slow(2000); + async.each(usersArray, function (user, cb) { + users.add(user, function (err, user) { + should(err).be.null; + user.should.not.be.null; + cb(err, user); + }); + }, function (err) { + + done(); + + }); + + }) + + }); + + + describe("devices", function () { + it("should create some devices", function (done) { + this.slow(2000); + async.each(devicesArray, function (device, cb) { + devices.add(device, function (err, device) { + should(err).be.null; + device.should.not.be.null; + cb(err, device); + }); + + }, function (err) { + + done() + + }); + + }) + + }); + + + describe("user<->devices", function () { + it("should connect devices to users", function (done) { + this.slow(5000); + async.eachSeries(usersArray, function (user, cb) { + connections.connect("users", user.username, "devices", devicesArray[user.number].name, + null, function (err) { + cb(err); + }); + }, function (err) { + + if (err) { + console.log("error adding users " + err); + } + done(); + }); + + }) + + }); + + + describe("groups", function () { + it("should create some groups", function (done) { + this.slow(2000); + var group1 = { + path: "notificationgroup1" + }; + + var group2 = { + path: "notificationgroup2" + }; + + async.series([ + function (cb) { + + groups.add(group1, function (err) { + if (err) { + console.log("failed to create " + "notificationgroup1:" + err); + } + cb(err); + + }); + }, function (cb) { + + groups.add(group2, function (err) { + if (err) { + console.log("failed to create " + "notificationgroup2:" + err); + } + cb(err); + }); + + + } + ], function (err, results) { + + done(); + + }); + + + }) + + }); + + + describe("groups<->users", function () { + it("should connect users to groups", function (done) { + this.slow(2000); + async.each(usersArray, function (user, cb) { + + async.series([ + function (cb) { + connections.connect("groups", "notificationgroup1", "users", user.username, null, + function (err) { + cb(err, user); + }); + + }, + function (cb) { + connections.connect("groups", "notificationgroup2", "users", user.username, null, + function (err) { + cb(err, user); + + }); + } + + ], function (err, results) { + + cb(err); + + }); + + }, function (err) { + done(); + }); + + }) + + }); + + + // SEND NOTIFICATIONS HERE AND VALIDATE THE NUMBER OF NOTIFICATIONS SENT ARE ACCURATE FOR QUERY + + describe("notification -> user - direct path", function () { + it("should send a single notification to a user", function (done) { + this.timeout(5000) + this.slow(5000); + setTimeout(function () { + + notifications.send("users/" + usersArray[1].username, gcmNotification, + function (err, notification) { + should(err).be.null; + notification.should.not.be.null; + notification.expectedCount.should.be.equal(1); + done(); + + }); + + }, 1000) + + + }) + + }); + + describe("notification -> user - via matrix query", function () { + it("should send a single notification to a user", function (done) { + this.timeout(5000) + this.slow(5000); + + setTimeout(function () { + + notifications.send("users;ql=select * where username = 'notificationuser-0'", gcmNotification, + function (err, notification) { + should(err).be.null; + notification.should.not.be.null; + notification.expectedCount.should.be.equal(1); + done(); + + }); + + }, 1000); + + + }) + + }); + + describe("notification -> groups - via matrix query", function () { + it("should send a single notification to groups with the same users", function (done) { + this.timeout(5000) + this.slow(5000); + setTimeout(function () { + + notifications.send("groups;ql=select * where path = 'notificationgroup1' " + + "or path = 'notificationgroup2'", gcmNotification, function (err, notification) { + + should(err).be.null; + notification.should.not.be.null; + // we set up 2 groups of the same 5 users. if duplicate filtering is working, + // we'll only have 5 expected + notification.expectedCount.should.be.equal(5); + done(); + + }); + + }, 1000); + + }) + + }); + + + } +}; http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/test/teardown.js ---------------------------------------------------------------------- diff --git a/tests/integration/test/teardown.js b/tests/integration/test/teardown.js index d1031ff..24a9ddd 100644 --- a/tests/integration/test/teardown.js +++ b/tests/integration/test/teardown.js @@ -65,16 +65,32 @@ module.exports = { }) }, function(cb) { + entities.deleteAll('notifiers', function(err, body) { + should(err).be.null; + body.entities.should.be.an.instanceOf(Array).and.have.lengthOf(0); + body.count.should.equal(0); + cb(err); + }) + }, + function(cb) { entities.deleteAll('notifications', function(err, body) { should(err).be.null; body.entities.should.be.an.instanceOf(Array).and.have.lengthOf(0); body.count.should.equal(0); cb(err); }) + }, + function(cb) { + entities.deleteAll('receipts', function(err, body) { + should(err).be.null; + body.entities.should.be.an.instanceOf(Array).and.have.lengthOf(0); + body.count.should.equal(0); + cb(err); + }) } ], function(err, data) { cb(err); }); } -} \ No newline at end of file +}; \ No newline at end of file