Add caching to queue manager so we don't create many instances of local or SNS 
queue manager impls for the same exact queue.  Enhance the node.js integration 
tests.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/853d6486
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/853d6486
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/853d6486

Branch: refs/heads/asf-site
Commit: 853d6486f82c51610bec6dc52d6fbde2cfe2da1a
Parents: 9b8d1dc
Author: Michael Russo <michaelaru...@gmail.com>
Authored: Fri Feb 5 15:28:06 2016 -0800
Committer: Michael Russo <michaelaru...@gmail.com>
Committed: Fri Feb 5 15:28:06 2016 -0800

----------------------------------------------------------------------
 .../persistence/entities/Notification.java      |   2 +-
 .../usergrid/persistence/queue/QueueFig.java    |   2 +-
 .../queue/impl/QueueManagerFactoryImpl.java     |  51 ++-
 .../queue/impl/SNSQueueManagerImpl.java         |   4 +-
 .../notifications/ApplicationQueueManager.java  |   9 +-
 .../services/notifications/TaskManager.java     |  68 ++--
 .../impl/ApplicationQueueManagerImpl.java       | 389 +++++++++----------
 .../gcm/NotificationsServiceIT.java             | 114 +++---
 tests/integration/lib/entities.js               |   2 +-
 tests/integration/lib/notifications.js          |   1 +
 tests/integration/lib/notifiers.js              |  43 ++
 tests/integration/test/groups/groups.js         |  55 +--
 tests/integration/test/main.js                  |   5 +-
 .../test/notifications/notifications.js         | 313 +++++++++++++++
 tests/integration/test/teardown.js              |  18 +-
 15 files changed, 712 insertions(+), 364 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
 
b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
index 34c7758..5c3ee89 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
@@ -90,7 +90,7 @@ public class Notification extends TypedEntity {
     @EntityProperty
     protected String priority;
 
-    /** Error messages that may have been encounted by Usergrid when trying to 
process the notification */
+    /** Error messages that may have been encountered by Usergrid when trying 
to process the notification */
     @EntityProperty
     protected String errorMessage;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index ad38f6d..cdab3e0 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -72,7 +72,7 @@ public interface QueueFig extends GuicyFig {
 
     // current msg size 1.2kb * 850000 = 1.02 GB (let this default be the most 
we'll queue in heap)
     @Key("usergrid.queue.publish.queuesize")
-    @Default("850000")
+    @Default("250000")
     int getAsyncQueueSize();
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
index 0f78678..de9cac5 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
@@ -19,12 +19,18 @@
  */
 package org.apache.usergrid.persistence.queue.impl;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.queue.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 /**
  * manages whether we take in an external in memory override for queues.
@@ -32,10 +38,36 @@ import java.util.Map;
 @Singleton
 public class QueueManagerFactoryImpl implements QueueManagerFactory {
 
+    private static final Logger logger = LoggerFactory.getLogger( 
QueueManagerFactoryImpl.class );
 
     private final QueueFig queueFig;
     private final QueueManagerInternalFactory queuemanagerInternalFactory;
     private final Map<String,QueueManager> defaultManager;
+    private final LoadingCache<QueueScope, QueueManager> queueManager =
+        CacheBuilder
+            .newBuilder()
+            .initialCapacity(5)
+            .maximumSize(100)
+            .build(new CacheLoader<QueueScope, QueueManager>() {
+
+                @Override
+                public QueueManager load( QueueScope scope ) throws Exception {
+
+                    if ( queueFig.overrideQueueForDefault() ){
+
+                        QueueManager manager = defaultManager.get( 
scope.getName() );
+                        if ( manager == null ) {
+                            manager = new LocalQueueManager();
+                            defaultManager.put( scope.getName(), manager );
+                        }
+                        return manager;
+
+                    } else {
+                        return 
queuemanagerInternalFactory.getQueueManager(scope);
+                    }
+
+                }
+            });
 
     @Inject
     public QueueManagerFactoryImpl(final QueueFig queueFig, final 
QueueManagerInternalFactory queuemanagerInternalFactory){
@@ -43,17 +75,18 @@ public class QueueManagerFactoryImpl implements 
QueueManagerFactory {
         this.queuemanagerInternalFactory = queuemanagerInternalFactory;
         this.defaultManager = new HashMap<>(10);
     }
+
     @Override
     public QueueManager getQueueManager(QueueScope scope) {
-        if(queueFig.overrideQueueForDefault()){
-            QueueManager manager = defaultManager.get(scope.getName());
-            if(manager==null){
-                manager = new LocalQueueManager();
-                defaultManager.put(scope.getName(),manager);
-            }
-            return manager;
-        }else{
-            return queuemanagerInternalFactory.getQueueManager(scope);
+
+        try {
+            return queueManager.get(scope);
+
+        } catch (ExecutionException e) {
+
+            logger.error("Unable to load or retrieve queue manager from cache 
for queue {}", scope.getName());
+            throw new RuntimeException(e);
         }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 3a1f045..8a503a5 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -564,7 +564,8 @@ public class SNSQueueManagerImpl implements QueueManager {
             logger.error( "SQS client is null, perhaps it failed to initialize 
successfully" );
             return;
         }
-
+        final long startSend = System.currentTimeMillis();
+        logger.info("starting send message");
         final String stringBody = toString( body );
 
         String url = getReadQueue().getUrl();
@@ -575,6 +576,7 @@ public class SNSQueueManagerImpl implements QueueManager {
 
         SendMessageRequest request = new SendMessageRequest( url, stringBody );
 
+        logger.info("now sending.  time spent since starting to send in ms: 
{}", System.currentTimeMillis() - startSend);
         sqsAsync.sendMessageAsync( request, new 
AsyncHandler<SendMessageRequest, SendMessageResult>() {
 
             @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 6bbd117..3f0ca69 100644
--- 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -32,14 +32,15 @@ import java.util.List;
  */
 public interface ApplicationQueueManager {
 
-    public static final String DEFAULT_QUEUE_PROPERTY = 
"usergrid.notifications.listener.queue";
+    String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queue";
 
-    public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
+    String NOTIFIER_ID_POSTFIX = ".notifier.id";
 
-    public static final  String DEFAULT_QUEUE_NAME = "push"; //keep this short 
as AWS limits queue name size to 80 chars
+    String DEFAULT_QUEUE_NAME = "push"; //keep this short as AWS limits queue 
name size to 80 chars
 
     /**
      * send notification to queue
+     *
      * @param notification
      * @param jobExecution
      * @throws Exception
@@ -48,6 +49,7 @@ public interface ApplicationQueueManager {
 
     /**
      * send notifications to providers
+     *
      * @param messages
      * @param queuePath
      * @return
@@ -61,6 +63,7 @@ public interface ApplicationQueueManager {
 
     /**
      * check for inactive devices, apple and google require this
+     *
      * @throws Exception
      */
     void asyncCheckForInactiveDevices() throws Exception;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 4f051e6..950447a 100644
--- 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -89,10 +89,10 @@ public class TaskManager {
             if (logger.isTraceEnabled()) {
                 logger.trace("COUNT is: {}", successes.get());
             }
-            if (hasFinished) { //process has finished but notifications are 
still coming in
-                finishedBatch();
-
-            }
+//            if (hasFinished) { //process has finished but notifications are 
still coming in
+//                finishedBatch();
+//
+//            }
         }
     }
 
@@ -115,6 +115,7 @@ public class TaskManager {
             }
         } finally {
             completed(notifier, deviceUUID);
+            finishedBatch();
         }
     }
 
@@ -128,7 +129,7 @@ public class TaskManager {
                 Receipt savedReceipt = em.create(receipt);
                 receipt.setUuid(savedReceipt.getUuid());
                 List<EntityRef> entities = Arrays.asList(notification, device);
-//              em.addToCollections(entities, 
Notification.RECEIPTS_COLLECTION, savedReceipt);
+                em.addToCollections(entities, 
Notification.RECEIPTS_COLLECTION, savedReceipt);
             } else {
                 em.update(receipt);
             }
@@ -150,51 +151,34 @@ public class TaskManager {
             }
         }
     }
+
     public void finishedBatch() throws Exception {
-        finishedBatch(true,false);
+        finishedBatch(true);
     }
-    public void finishedBatch(boolean fetch, boolean force) throws Exception {
-
-        if (notification.getDebug() || getFailures() > 0 || force) {
-            long successes = this.successes.get(); //reset counters
-            long failures = this.failures.get(); //reset counters
-
-            for (int i = 0; i < successes; i++) {
-                this.successes.decrementAndGet();
-            }
 
-            for (int i = 0; i < failures; i++) {
-                this.failures.decrementAndGet();
-            }
-
-            this.hasFinished = true;
+    public void finishedBatch(boolean refreshNotification) throws Exception {
 
-            // refresh notification
-            if (fetch)
-                notification = em.get(this.notification.getUuid(), 
Notification.class);
+        long successes = this.successes.get(); //reset counters
+        long failures = this.failures.get(); //reset counters
 
-            //and write them out again, this will produce the most accurate 
count
-            Map<String, Long> stats = new HashMap<>(2);
-            stats.put("sent", successes);
-            stats.put("errors", failures);
-            notification.updateStatistics(successes, failures);
+        for (int i = 0; i < successes; i++) {
+            this.successes.decrementAndGet();
+        }
+        for (int i = 0; i < failures; i++) {
+            this.failures.decrementAndGet();
+        }
 
-            long totals = (notification.getStatistics().get("sent") + 
notification.getStatistics().get("errors"));
-            //none of this is known and should you ever do this
-            notification.setModified(System.currentTimeMillis());
-            notification.setFinished(notification.getModified());
+        this.hasFinished = true;
 
-            Map<String, Object> properties = new HashMap<>();
-            properties.put("finished", notification.getModified());
-            properties.put("state", notification.getState());
-            notification.addProperties(properties);
+        // force refresh notification by fetching it
+        if (refreshNotification) {
+            notification = em.get(this.notification.getUuid(), 
Notification.class);
+        }
 
-            long latency = notification.getFinished() - 
notification.getStarted();
-            logger.info("notification finished batch: {} of {} devices in {} 
ms", notification.getUuid(), totals, latency);
+        notification.updateStatistics(successes, failures);
+        notification.setModified(System.currentTimeMillis());
+        notification.setFinished(notification.getModified());
 
-            em.update(notification);
-//        Set<Notifier> notifiers = new 
HashSet<>(proxy.getAdapterMap().values()); // remove dups
-//        proxy.asyncCheckForInactiveDevices(notifiers);
-        }
+        em.update(notification);
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index c956417..d0f8ca8 100644
--- 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -56,7 +56,7 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
     HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve 
notifiers once
 
 
-    public ApplicationQueueManagerImpl(JobScheduler jobScheduler, 
EntityManager entityManager, QueueManager queueManager, MetricsFactory 
metricsFactory, Properties properties){
+    public ApplicationQueueManagerImpl(JobScheduler jobScheduler, 
EntityManager entityManager, QueueManager queueManager, MetricsFactory 
metricsFactory, Properties properties) {
         this.em = entityManager;
         this.qm = queueManager;
         this.jobScheduler = jobScheduler;
@@ -67,13 +67,13 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
 
     }
 
-    private boolean scheduleQueueJob(Notification notification) throws 
Exception{
+    private boolean scheduleQueueJob(Notification notification) throws 
Exception {
         return jobScheduler.scheduleQueueJob(notification);
     }
 
     @Override
     public void queueNotification(final Notification notification, final 
JobExecution jobExecution) throws Exception {
-        if(scheduleQueueJob(notification)){
+        if (scheduleQueueJob(notification)) {
             em.update(notification);
             return;
         }
@@ -94,110 +94,97 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
             logger.trace("notification {} start queuing", 
notification.getUuid());
         }
 
-        final PathQuery<Device> pathQuery = 
notification.getPathQuery().buildPathQuery() ; //devices query
+        final PathQuery<Device> pathQuery = 
notification.getPathQuery().buildPathQuery(); //devices query
         final AtomicInteger deviceCount = new AtomicInteger(); //count devices 
so you can make a judgement on batching
-        final ConcurrentLinkedQueue<String> errorMessages = new 
ConcurrentLinkedQueue<String>(); //build up list of issues
+        final ConcurrentLinkedQueue<String> errorMessages = new 
ConcurrentLinkedQueue<>(); //build up list of issues
 
 
         //get devices in querystring, and make sure you have access
         if (pathQuery != null) {
-            final HashMap<Object,ProviderAdapter> notifierMap =  
getAdapterMap();
+            final HashMap<Object, ProviderAdapter> notifierMap = 
getAdapterMap();
             if (logger.isTraceEnabled()) {
                 logger.trace("notification {} start query", 
notification.getUuid());
             }
             final Iterator<Device> iterator = pathQuery.iterator(em);
+
             //if there are more pages (defined by PAGE_SIZE) you probably want 
this to be async, also if this is already a job then don't reschedule
             if (iterator instanceof ResultsIterator && ((ResultsIterator) 
iterator).hasPages() && jobExecution == null) {
+                if(logger.isTraceEnabled()){
+                    logger.trace("Scheduling notification job as it has 
multiple pages of devices.");
+                }
                 jobScheduler.scheduleQueueJob(notification, true);
                 em.update(notification);
                 return;
             }
             final UUID appId = em.getApplication().getUuid();
-            final Map<String,Object> payloads = notification.getPayloads();
-
-            final Func1<Entity,Entity> entityListFunct = entity -> {
+            final Map<String, Object> payloads = notification.getPayloads();
 
+            final Func1<EntityRef, EntityRef> sendMessageFunction = deviceRef 
-> {
                 try {
 
                     long now = System.currentTimeMillis();
-                    List<EntityRef> devicesRef = getDevices(entity); // 
resolve group
 
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("notification {} queue  {} devices, 
duration {} ms", notification.getUuid(), devicesRef.size(), 
(System.currentTimeMillis() - now));
-                    }
+                    String notifierId = null;
+                    String notifierKey = null;
 
-                    for (EntityRef deviceRef : devicesRef) {
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("notification {} starting to queue 
device {} ", notification.getUuid(), deviceRef.getUuid());
+                    //find the device notifier info, match it to the payload
+                    for (Map.Entry<String, Object> entry : 
payloads.entrySet()) {
+                        ProviderAdapter adapter = 
notifierMap.get(entry.getKey().toLowerCase());
+                        now = System.currentTimeMillis();
+                        String providerId = getProviderId(deviceRef, 
adapter.getNotifier());
+                        if (providerId != null) {
+                            notifierId = providerId;
+                            notifierKey = entry.getKey().toLowerCase();
+                            break;
                         }
-                        String notifierId = null;
-                        String notifierKey = null;
-
-                        //find the device notifier info, match it to the 
payload
-                        for (Map.Entry<String, Object> entry : 
payloads.entrySet()) {
-                            ProviderAdapter adapter = 
notifierMap.get(entry.getKey().toLowerCase());
-                            now = System.currentTimeMillis();
-                            String providerId = getProviderId(deviceRef, 
adapter.getNotifier());
-                            if (providerId != null) {
-                                notifierId = providerId;
-                                notifierKey = entry.getKey().toLowerCase();
-                                break;
-                            }
-                            if (logger.isTraceEnabled()) {
-                                logger.trace("Provider query for notification 
{} device {} took {} ms", notification.getUuid(), deviceRef.getUuid(), 
(System.currentTimeMillis() - now));
-                            }
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("Provider query for notification {} 
device {} took {} ms", notification.getUuid(), deviceRef.getUuid(), 
(System.currentTimeMillis() - now));
                         }
+                    }
 
-                        if (notifierId == null) {
-                            logger.info("Notifier did not match for device {} 
", deviceRef);
-                            continue;
-                        }
+                    if (notifierId == null) {
+                        return deviceRef;
+                    }
+
+                    ApplicationQueueMessage message = new 
ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), 
notifierKey, notifierId);
+                    if (notification.getQueued() == null) {
+
+                        // update queued time
+                        notification.setQueued(System.currentTimeMillis());
 
-                        ApplicationQueueMessage message = new 
ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), 
notifierKey, notifierId);
-                        if (notification.getQueued() == null) {
-                            // update queued time
-                            now = System.currentTimeMillis();
-                            notification.setQueued(System.currentTimeMillis());
-                            if (logger.isTraceEnabled()) {
-                                logger.trace("notification {} device {} queue 
time set. duration {} ms", notification.getUuid(), deviceRef.getUuid(), 
(System.currentTimeMillis() - now));
-                            }
-                        }
-                        now = System.currentTimeMillis();
-                        qm.sendMessage(message);
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("notification {} post-queue to device 
{} duration {} ms, {} queue", notification.getUuid(), deviceRef.getUuid(), 
(System.currentTimeMillis() - now), queueName);
-                        }
-                        deviceCount.incrementAndGet();
-                        queueMeter.mark();
                     }
+                    qm.sendMessage(message);
+                    deviceCount.incrementAndGet();
+                    queueMeter.mark();
+
+
                 } catch (Exception deviceLoopException) {
-                    logger.error("Failed to add devices", deviceLoopException);
-                    errorMessages.add("Failed to add devices for entity: " + 
entity.getUuid() + " error:" + deviceLoopException);
+                    logger.error("Failed to add device", deviceLoopException);
+                    errorMessages.add("Failed to add device: " + 
deviceRef.getUuid() + ", error:" + deviceLoopException);
                 }
-                return entity;
+                return deviceRef;
             };
 
-            long now = System.currentTimeMillis();
 
 
             //process up to 10 concurrently
-            Observable o = rx.Observable.create( new 
IteratorObservable<Entity>( iterator ) )
-                .distinct( entity -> entity.getUuid() )
-                .flatMap(entity ->
-                    Observable.just(entity).map(entityListFunct)
-                        .doOnError(throwable -> logger.error("Failed while 
writing", throwable)) , 10);
+            Observable processMessagesObservable = Observable.create(new 
IteratorObservable<Entity>(iterator))
+                .flatMap(entity -> {
+                    return Observable.from(getDevices(entity));
+                }, 10)
+                .distinct(ref -> ref.getUuid())
+                .map(sendMessageFunction)
+                .doOnError(throwable -> logger.error("Failed while trying to 
send notification", throwable));
+
+            processMessagesObservable.toBlocking().lastOrDefault(null);
 
-            o.toBlocking().lastOrDefault( null );
-            if (logger.isTraceEnabled()) {
-                logger.trace("notification {} done queueing duration {} ms", 
notification.getUuid(), System.currentTimeMillis() - now);
-            }
         }
 
         // update queued time
         Map<String, Object> properties = new HashMap<>(2);
         properties.put("queued", notification.getQueued());
         properties.put("state", notification.getState());
-        if(errorMessages.size()>0){
+        if (errorMessages.size() > 0) {
             if (notification.getErrorMessage() == null) {
                 notification.setErrorMessage("There was a problem delivering 
all of your notifications. See deliveryErrors in properties");
             }
@@ -205,40 +192,33 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
 
         notification.setExpectedCount(deviceCount.get());
         notification.addProperties(properties);
-        long now = System.currentTimeMillis();
-
+        em.update(notification);
 
-        logger.info("notification {} updated notification duration {} ms", 
notification.getUuid(), System.currentTimeMillis() - now);
 
-        //do i have devices, and have i already started batching.
-        if (deviceCount.get() <= 0 || !notification.getDebug()) {
+        // if no devices, go ahead and mark the batch finished
+        if (deviceCount.get() <= 0 ) {
             TaskManager taskManager = new TaskManager(em, notification);
-            //if i'm in a test value will be false, do not mark finished for 
test orchestration, not ideal need real tests
-            taskManager.finishedBatch(false,true);
-        }else {
-            em.update(notification);
+            taskManager.finishedBatch(true);
         }
 
-        long elapsed = notification.getQueued() != null ? 
notification.getQueued() - startTime : 0;
-        if (logger.isTraceEnabled()) {
-            logger.trace("notification {} done queuing to {} devices in {} 
ms", notification.getUuid().toString(), deviceCount.get(), elapsed);
-        }
+
     }
 
     /**
      * only need to get notifiers once. will reset on next batch
+     *
      * @return
      */
-    private HashMap<Object,ProviderAdapter> getAdapterMap(){
-        if(notifierHashMap == null) {
+    private HashMap<Object, ProviderAdapter> getAdapterMap() {
+        if (notifierHashMap == null) {
             long now = System.currentTimeMillis();
-            notifierHashMap = new HashMap<Object, ProviderAdapter>();
+            notifierHashMap = new HashMap<>();
             Query query = new Query();
             query.setCollection("notifiers");
             query.setLimit(100);
-            PathQuery<Notifier> pathQuery = new PathQuery<Notifier>(
-                    new SimpleEntityRef(em.getApplicationRef()),
-                    query
+            PathQuery<Notifier> pathQuery = new PathQuery<>(
+                new SimpleEntityRef(em.getApplicationRef()),
+                query
             );
             Iterator<Notifier> notifierIterator = pathQuery.iterator(em);
             int count = 0;
@@ -246,22 +226,22 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
                 Notifier notifier = notifierIterator.next();
                 String name = notifier.getName() != null ? notifier.getName() 
: "";
                 UUID uuid = notifier.getUuid() != null ? notifier.getUuid() : 
UUID.randomUUID();
-                ProviderAdapter providerAdapter = 
ProviderAdapterFactory.getProviderAdapter(notifier,em);
+                ProviderAdapter providerAdapter = 
ProviderAdapterFactory.getProviderAdapter(notifier, em);
                 notifierHashMap.put(name.toLowerCase(), providerAdapter);
                 notifierHashMap.put(uuid, providerAdapter);
                 notifierHashMap.put(uuid.toString(), providerAdapter);
-                if(count++ >= 100){
+                if (count++ >= 100) {
                     logger.error("ApplicationQueueManager: too many 
notifiers...breaking out ", notifierHashMap.size());
                     break;
                 }
             }
-            logger.info("ApplicationQueueManager: fetching notifiers finished 
size={}, duration {} ms", notifierHashMap.size(),System.currentTimeMillis() - 
now);
         }
         return notifierHashMap;
     }
 
     /**
      * send batches of notifications to provider
+     *
      * @param messages
      * @throws Exception
      */
@@ -273,128 +253,123 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
 
         final Map<Object, ProviderAdapter> notifierMap = getAdapterMap();
         final ApplicationQueueManagerImpl proxy = this;
-        final ConcurrentHashMap<UUID,TaskManager> taskMap = new 
ConcurrentHashMap<UUID, TaskManager>(messages.size());
-        final ConcurrentHashMap<UUID,Notification> notificationMap = new 
ConcurrentHashMap<UUID, Notification>(messages.size());
-
-        final Func1<QueueMessage, ApplicationQueueMessage> func = new 
Func1<QueueMessage, ApplicationQueueMessage>() {
-            @Override
-            public ApplicationQueueMessage call(QueueMessage queueMessage) {
-                boolean messageCommitted = false;
-                ApplicationQueueMessage message = null;
-                try {
-                    message = (ApplicationQueueMessage) queueMessage.getBody();
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("start sending notification for device {} 
for Notification: {} on thread {}", message.getDeviceId(), 
message.getNotificationId(), Thread.currentThread().getId());
-                    }
+        final ConcurrentHashMap<UUID, TaskManager> taskMap = new 
ConcurrentHashMap<UUID, TaskManager>(messages.size());
+        final ConcurrentHashMap<UUID, Notification> notificationMap = new 
ConcurrentHashMap<UUID, Notification>(messages.size());
 
-                    UUID deviceUUID = message.getDeviceId();
+        final Func1<QueueMessage, ApplicationQueueMessage> func = queueMessage 
-> {
+            boolean messageCommitted = false;
+            ApplicationQueueMessage message = null;
+            try {
+                message = (ApplicationQueueMessage) queueMessage.getBody();
+                if (logger.isTraceEnabled()) {
+                    logger.trace("start sending notification for device {} for 
Notification: {} on thread {}", message.getDeviceId(), 
message.getNotificationId(), Thread.currentThread().getId());
+                }
 
-                    Notification notification = 
notificationMap.get(message.getNotificationId());
-                    if (notification == null) {
-                        notification = em.get(message.getNotificationId(), 
Notification.class);
-                        notificationMap.put(message.getNotificationId(), 
notification);
-                    }
-                    TaskManager taskManager = 
taskMap.get(message.getNotificationId());
-                    if (taskManager == null) {
-                        taskManager = new TaskManager(em, notification);
-                        taskMap.putIfAbsent(message.getNotificationId(), 
taskManager);
-                        taskManager = taskMap.get(message.getNotificationId());
-                    }
+                UUID deviceUUID = message.getDeviceId();
 
-                    final Map<String, Object> payloads = 
notification.getPayloads();
-                    final Map<String, Object> translatedPayloads = 
translatePayloads(payloads, notifierMap);
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("sending notification for device {} for 
Notification: {}", deviceUUID, notification.getUuid());
-                    }
+                Notification notification = 
notificationMap.get(message.getNotificationId());
+                if (notification == null) {
+                    notification = em.get(message.getNotificationId(), 
Notification.class);
+                    notificationMap.put(message.getNotificationId(), 
notification);
+                }
+                TaskManager taskManager = 
taskMap.get(message.getNotificationId());
+                if (taskManager == null) {
+                    taskManager = new TaskManager(em, notification);
+                    taskMap.putIfAbsent(message.getNotificationId(), 
taskManager);
+                    taskManager = taskMap.get(message.getNotificationId());
+                }
 
-                    try {
-                        String notifierName = 
message.getNotifierKey().toLowerCase();
-                        ProviderAdapter providerAdapter = 
notifierMap.get(notifierName.toLowerCase());
-                        Object payload = translatedPayloads.get(notifierName);
-                        Receipt receipt = new Receipt(notification.getUuid(), 
message.getNotifierId(), payload, deviceUUID);
-                        TaskTracker tracker = new 
TaskTracker(providerAdapter.getNotifier(), taskManager, receipt, deviceUUID);
-                        if(!isOkToSend(notification)){
-                             tracker.failed(0, "Notification is 
duplicate/expired/cancelled.");
-                        }else {
-                            if (payload == null) {
-                                if (logger.isDebugEnabled()) {
-                                    logger.debug("selected device {} for 
notification {} doesn't have a valid payload. skipping.", deviceUUID, 
notification.getUuid());
-                                }
-                                tracker.failed(0, "failed to match payload to 
" + message.getNotifierId() + " notifier");
-                            } else {
-                                long now = System.currentTimeMillis();
-                                try {
-                                    
providerAdapter.sendNotification(message.getNotifierId(), payload, 
notification, tracker);
-                                } catch (Exception e) {
-                                    tracker.failed(0, e.getMessage());
-                                } finally {
-                                    if (logger.isTraceEnabled()) {
-                                        logger.trace("sending to device {} for 
Notification: {} duration {} ms", deviceUUID, notification.getUuid(), 
(System.currentTimeMillis() - now));
-                                    }
+                final Map<String, Object> payloads = 
notification.getPayloads();
+                final Map<String, Object> translatedPayloads = 
translatePayloads(payloads, notifierMap);
+                if (logger.isTraceEnabled()) {
+                    logger.trace("sending notification for device {} for 
Notification: {}", deviceUUID, notification.getUuid());
+                }
+
+                try {
+                    String notifierName = 
message.getNotifierKey().toLowerCase();
+                    ProviderAdapter providerAdapter = 
notifierMap.get(notifierName.toLowerCase());
+                    Object payload = translatedPayloads.get(notifierName);
+                    Receipt receipt = new Receipt(notification.getUuid(), 
message.getNotifierId(), payload, deviceUUID);
+                    TaskTracker tracker = new 
TaskTracker(providerAdapter.getNotifier(), taskManager, receipt, deviceUUID);
+                    if (!isOkToSend(notification)) {
+                        tracker.failed(0, "Notification is 
duplicate/expired/cancelled.");
+                    } else {
+                        if (payload == null) {
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("selected device {} for 
notification {} doesn't have a valid payload. skipping.", deviceUUID, 
notification.getUuid());
+                            }
+                            tracker.failed(0, "failed to match payload to " + 
message.getNotifierId() + " notifier");
+                        } else {
+                            long now = System.currentTimeMillis();
+                            try {
+                                
providerAdapter.sendNotification(message.getNotifierId(), payload, 
notification, tracker);
+                            } catch (Exception e) {
+                                tracker.failed(0, e.getMessage());
+                            } finally {
+                                if (logger.isTraceEnabled()) {
+                                    logger.trace("sending to device {} for 
Notification: {} duration {} ms", deviceUUID, notification.getUuid(), 
(System.currentTimeMillis() - now));
                                 }
                             }
                         }
-                        messageCommitted = true;
-                    } finally {
-                        sendMeter.mark();
                     }
+                    messageCommitted = true;
+                } finally {
+                    sendMeter.mark();
+                }
 
-                } catch (Exception e) {
-                    logger.error("Failure while sending",e);
-                    try {
-                        if(!messageCommitted && queuePath != null) {
-                            qm.commitMessage(queueMessage);
-                        }
-                    }catch (Exception queueException){
-                        logger.error("Failed to commit 
message.",queueException);
+            } catch (Exception e) {
+                logger.error("Failure while sending", e);
+                try {
+                    if (!messageCommitted && queuePath != null) {
+                        qm.commitMessage(queueMessage);
                     }
+                } catch (Exception queueException) {
+                    logger.error("Failed to commit message.", queueException);
                 }
-                return message;
             }
+            return message;
         };
 
         //from each queue message, process them in parallel up to 10 at a time
-        Observable o = rx.Observable.from( messages ).flatMap( queueMessage -> 
{
+        Observable queueMessageObservable = 
Observable.from(messages).flatMap(queueMessage -> {
 
 
-            return Observable.just( queueMessage ).map( func ).buffer( 
messages.size() ).map( queueMessages -> {
+            return 
Observable.just(queueMessage).map(func).buffer(messages.size()).map(queueMessages
 -> {
                 //for gcm this will actually send notification
-                for ( ProviderAdapter providerAdapter : notifierMap.values() ) 
{
+                for (ProviderAdapter providerAdapter : notifierMap.values()) {
                     try {
                         providerAdapter.doneSendingNotifications();
-                    }
-                    catch ( Exception e ) {
-                        logger.error( 
"providerAdapter.doneSendingNotifications: ", e );
+                    } catch (Exception e) {
+                        
logger.error("providerAdapter.doneSendingNotifications: ", e);
                     }
                 }
                 //TODO: check if a notification is done and mark it
                 HashMap<UUID, ApplicationQueueMessage> notifications = new 
HashMap<>();
-                for ( ApplicationQueueMessage message : queueMessages ) {
-                    if ( notifications.get( message.getNotificationId() ) == 
null ) {
+                for (ApplicationQueueMessage message : queueMessages) {
+                    if (notifications.get(message.getNotificationId()) == 
null) {
                         try {
-                            TaskManager taskManager = taskMap.get( 
message.getNotificationId() );
-                            notifications.put( message.getNotificationId(), 
message );
+                            TaskManager taskManager = 
taskMap.get(message.getNotificationId());
+                            notifications.put(message.getNotificationId(), 
message);
                             taskManager.finishedBatch();
-                        }
-                        catch ( Exception e ) {
-                            logger.error( "Failed to finish batch", e );
+                        } catch (Exception e) {
+                            logger.error("Failed to finish batch", e);
                         }
                     }
                 }
                 return notifications;
-            } ).doOnError( throwable -> logger.error( "Failed while sending", 
throwable ) );
-        }, 10 );
+            }).doOnError(throwable -> logger.error("Failed while sending", 
throwable));
+        }, 10);
 
-        return o;
+        return queueMessageObservable;
     }
 
     @Override
-    public void stop(){
-        for(ProviderAdapter adapter : getAdapterMap().values()){
+    public void stop() {
+        for (ProviderAdapter adapter : getAdapterMap().values()) {
             try {
                 adapter.stop();
-            }catch (Exception e){
-                logger.error("failed to stop adapter",e);
+            } catch (Exception e) {
+                logger.error("failed to stop adapter", e);
             }
         }
     }
@@ -407,7 +382,7 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
      */
     private Map<String, Object> translatePayloads(Map<String, Object> 
payloads, Map<Object, ProviderAdapter>
         notifierMap) throws Exception {
-        Map<String, Object> translatedPayloads = new HashMap<String, Object>(  
payloads.size());
+        Map<String, Object> translatedPayloads = new HashMap<String, 
Object>(payloads.size());
         for (Map.Entry<String, Object> entry : payloads.entrySet()) {
             String payloadKey = entry.getKey().toLowerCase();
             Object payloadValue = entry.getValue();
@@ -431,10 +406,13 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
 
     private static final class IteratorObservable<T> implements 
rx.Observable.OnSubscribe<T> {
         private final Iterator<T> input;
-        private IteratorObservable( final Iterator input ) {this.input = 
input;}
+
+        private IteratorObservable(final Iterator input) {
+            this.input = input;
+        }
 
         @Override
-        public void call( final Subscriber<? super T> subscriber ) {
+        public void call(final Subscriber<? super T> subscriber) {
 
             /**
              * You would replace this code with your file reading.  Instead of 
emitting from an iterator,
@@ -442,17 +420,16 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
              */
 
             try {
-                while ( !subscriber.isUnsubscribed() && input.hasNext() ) {
+                while (!subscriber.isUnsubscribed() && input.hasNext()) {
                     //send our input to the next
-                    subscriber.onNext( (T) input.next() );
+                    subscriber.onNext((T) input.next());
                 }
 
                 //tell the subscriber we don't have any more data
                 subscriber.onCompleted();
-            }
-            catch ( Throwable t ) {
-                logger.error("failed on subscriber",t);
-                subscriber.onError( t );
+            } catch (Throwable t) {
+                logger.error("failed on subscriber", t);
+                subscriber.onError(t);
             }
         }
     }
@@ -483,45 +460,61 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
 
 
     private boolean isOkToSend(Notification notification) {
-        Map<String,Long> stats = notification.getStatistics();
-        if (stats != null && notification.getExpectedCount() == 
(stats.get("sent")+ stats.get("errors"))) {
+        Map<String, Long> stats = notification.getStatistics();
+        if (stats != null && notification.getExpectedCount() == 
(stats.get("sent") + stats.get("errors"))) {
             if (logger.isDebugEnabled()) {
                 logger.debug("notification {} already processed. not sending.",
-                        notification.getUuid());
+                    notification.getUuid());
             }
             return false;
         }
         if (notification.getCanceled() == Boolean.TRUE) {
             if (logger.isDebugEnabled()) {
                 logger.debug("notification {} canceled. not sending.",
-                        notification.getUuid());
+                    notification.getUuid());
             }
             return false;
         }
         if (notification.isExpired()) {
             if (logger.isDebugEnabled()) {
                 logger.debug("notification {} expired. not sending.",
-                        notification.getUuid());
+                    notification.getUuid());
             }
             return false;
         }
         return true;
     }
 
-    private List<EntityRef> getDevices(EntityRef ref) throws Exception {
+    private List<EntityRef> getDevices(EntityRef ref) {
+
         List<EntityRef> devices = Collections.EMPTY_LIST;
-        if ("device".equals(ref.getType())) {
-            devices = Collections.singletonList(ref);
-        } else if ("user".equals(ref.getType())) {
-            devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT,
+
+        try {
+            if ("device".equals(ref.getType())) {
+                devices = Collections.singletonList(ref);
+            } else if ("user".equals(ref.getType())) {
+                devices = em.getCollection(ref, "devices", null, 
Query.MAX_LIMIT,
                     Query.Level.REFS, false).getRefs();
-        } else if ("group".equals(ref.getType())) {
-            devices = new ArrayList<EntityRef>();
-            for (EntityRef r : em.getCollection(ref, "users", null,
+            } else if ("group".equals(ref.getType())) {
+                devices = new ArrayList<>();
+                for (EntityRef r : em.getCollection(ref, "users", null,
                     Query.MAX_LIMIT, Query.Level.REFS, false).getRefs()) {
-                devices.addAll(getDevices(r));
+                    devices.addAll(getDevices(r));
+                }
+            }
+        } catch (Exception e) {
+
+            if (ref != null){
+                logger.error("Error while retrieving devices for entity type 
{} and uuid {}. Error: {}",
+                    ref.getType(), ref.getUuid(), e);
+            }else{
+                logger.error("Error while retrieving devices. Entity ref was 
null.");
             }
+
+            throw new RuntimeException("Unable to retrieve devices for 
EntityRef", e);
+
         }
+
         return devices;
     }
 
@@ -534,7 +527,7 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
             }
             return value != null ? value.toString() : null;
         } catch (Exception e) {
-            logger.error("Errer getting provider ID, proceding with rest of 
batch", e);
+            logger.error("Error getting provider ID, proceeding with rest of 
batch", e);
             return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
 
b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 97513be..65cc54a 100644
--- 
a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ 
b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -39,16 +39,18 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
 
 
     private static final Logger logger = LoggerFactory
-            .getLogger(NotificationsServiceIT.class);
+        .getLogger(NotificationsServiceIT.class);
 
-    /** set to true to use actual connections to GCM servers */
+    /**
+     * set to true to use actual connections to GCM servers
+     */
     private static final boolean USE_REAL_CONNECTIONS = true;
     private static final String PROVIDER = USE_REAL_CONNECTIONS ? "google" : 
"noop";
 
     private static final String API_KEY = 
"AIzaSyCIH_7WC0mOqBGMOXyQnFgrBpOePgHvQJM";
     private static final String PUSH_TOKEN = 
"APA91bGxRGnMK8tKgVPzSlxtCFvwSVqx0xEPjA06sBmiK0k"
-            + 
"QsiwUt6ipSYF0iPRHyUgpXle0P8OlRWJADkQrcN7yxG4pLMg1CVmrqDu8tfSe63mZ-MRU2IW0cOhmo"
-            + "sqzC9trl33moS3OvT7qjDjkP4Qq8LYdwwYC5A";
+        + 
"QsiwUt6ipSYF0iPRHyUgpXle0P8OlRWJADkQrcN7yxG4pLMg1CVmrqDu8tfSe63mZ-MRU2IW0cOhmo"
+        + "sqzC9trl33moS3OvT7qjDjkP4Qq8LYdwwYC5A";
 
     private Notifier notifier;
     private Device device1, device2;
@@ -56,14 +58,12 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
     private QueueListener listener;
 
 
-
-
-
     @BeforeClass
-    public static void setup(){
+    public static void setup() {
 
 
     }
+
     @Before
     public void before() throws Exception {
 
@@ -77,8 +77,8 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         app.put("apiKey", API_KEY);
 
         notifier = (Notifier) app
-                .testRequest(ServiceAction.POST, 1, "notifiers").getEntity()
-                .toTypedEntity();
+            .testRequest(ServiceAction.POST, 1, "notifiers").getEntity()
+            .toTypedEntity();
         String key = notifier.getName() + NOTIFIER_ID_POSTFIX;
 
         // create devices //
@@ -86,7 +86,7 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         app.clear();
         app.put(key, PUSH_TOKEN);
 
-        Entity e = app.testRequest(ServiceAction.POST, 1, "devices") 
.getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, 
"devices").getEntity();
         app.testRequest(ServiceAction.GET, 1, "devices", e.getUuid());
 
         device1 = app.getEntityManager().get(e.getUuid(), Device.class);
@@ -103,8 +103,8 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
     }
 
     @After
-    public void after(){
-        if(listener!=null) {
+    public void after() {
+        if (listener != null) {
             listener.stop();
             listener = null;
         }
@@ -119,23 +119,23 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         app.put("environment", "development");
         app.put("apiKey", API_KEY);
         Notifier n = (Notifier) app
-                .testRequest(ServiceAction.POST, 1, "notifiers").getEntity()
-                .toTypedEntity();
+            .testRequest(ServiceAction.POST, 1, "notifiers").getEntity()
+            .toTypedEntity();
 
         app.clear();
         String payload = "Hello, World!";
         Map<String, String> payloads = new HashMap<String, String>(1);
         payloads.put("foo", payload);
         app.put("payloads", payloads);
-        app.put("debug",true);
+        app.put("debug", true);
         app.put("queued", System.currentTimeMillis());
 
-        Entity e = app.testRequest(ServiceAction.POST, 
1,"devices",device1.getUuid(), "notifications")
-                .getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", 
device1.getUuid(), "notifications")
+            .getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
         Notification notification = app.getEntityManager().get(e.getUuid(),
-                Notification.class);
+            Notification.class);
 
         // perform push //
         notification = notificationWaitForComplete(notification);
@@ -151,16 +151,16 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
         app.put("expire", System.currentTimeMillis() + 300000); // add 5 
minutes to current time
 
-        Entity e = app.testRequest(ServiceAction.POST, 1, 
"devices",device1.getUuid(),"notifications").getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", 
device1.getUuid(), "notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
         Notification notification = app.getEntityManager().get(e.getUuid(), 
Notification.class);
         assertEquals(
-                notification.getPayloads().get(notifier.getUuid().toString()),
-                payload);
+            notification.getPayloads().get(notifier.getUuid().toString()),
+            payload);
 
         // perform push //
         notification = notificationWaitForComplete(notification);
@@ -177,11 +177,11 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
         app.put("expire", System.currentTimeMillis() + 300000); // add 5 
minutes to current time
         app.put("priority", "high");
 
-        Entity e = app.testRequest(ServiceAction.POST, 1, 
"devices",device1.getUuid(),"notifications").getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", 
device1.getUuid(), "notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
         Notification notification = app.getEntityManager().get(e.getUuid(), 
Notification.class);
@@ -204,11 +204,11 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
         app.put("expire", System.currentTimeMillis() + 300000); // add 5 
minutes to current time
         app.put("priority", "not_a_priority");
 
-        Entity e = app.testRequest(ServiceAction.POST, 1, 
"devices",device1.getUuid(),"notifications").getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", 
device1.getUuid(), "notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
         Notification notification = app.getEntityManager().get(e.getUuid(), 
Notification.class);
@@ -232,10 +232,10 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
         app.put("expire", System.currentTimeMillis() + 300000); // add 5 
minutes to current time
 
-        Entity e = app.testRequest(ServiceAction.POST, 1, 
"devices",device1.getUuid(),"notifications").getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", 
device1.getUuid(), "notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
         Notification notification = app.getEntityManager().get(e.getUuid(), 
Notification.class);
@@ -257,10 +257,10 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
         app.put("expire", System.currentTimeMillis() + 300000); // add 5 
minutes to current time
 
-        Entity e = app.testRequest(ServiceAction.POST, 1, 
"devices","*","notifications").getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", "*", 
"notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
         Notification notification = app.getEntityManager().get(e.getUuid(), 
Notification.class);
@@ -285,7 +285,7 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         assertNotNull(user);
 
         // post an existing device to user's devices collection
-        Entity device = app.testRequest(ServiceAction.POST, 1, "users",  
user.getUuid(), "devices", device1.getUuid()).getEntity();
+        Entity device = app.testRequest(ServiceAction.POST, 1, "users", 
user.getUuid(), "devices", device1.getUuid()).getEntity();
         assertEquals(device.getUuid(), device1.getUuid());
 
         // create and post notification
@@ -294,8 +294,8 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
-        Entity e = app.testRequest(ServiceAction.POST, 
1,"users",user.getUuid(), "notifications").getEntity();
+        app.put("debug", true);
+        Entity e = app.testRequest(ServiceAction.POST, 1, "users", 
user.getUuid(), "notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
 
@@ -315,15 +315,15 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
 
-        Entity e = app.testRequest(ServiceAction.POST, 1, 
"devices","notifications")   .getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", 
"notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
-        Notification notification = app.getEntityManager().get(e.getUuid(),  
Notification.class);
+        Notification notification = app.getEntityManager().get(e.getUuid(), 
Notification.class);
         assertEquals(
-                notification.getPayloads().get(notifier.getUuid().toString()),
-                payload);
+            notification.getPayloads().get(notifier.getUuid().toString()),
+            payload);
 
         // reduce Batch size to 1
         Field field = GCMAdapter.class.getDeclaredField("BATCH_SIZE");
@@ -349,10 +349,10 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
 
         app.clear();
         app.put("payloads", "{asdf}");
-        app.put("debug",true);
+        app.put("debug", true);
 
         try {
-            app.testRequest(ServiceAction.POST, 1,"devices",device1.getUuid(), 
"notifications");
+            app.testRequest(ServiceAction.POST, 1, "devices", 
device1.getUuid(), "notifications");
             fail("invalid payload should have been rejected");
         } catch (IllegalArgumentException ex) {
             // ok
@@ -364,7 +364,7 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         app.put("payloads", payloads);
         payloads.put("xxx", "");
         try {
-            app.testRequest(ServiceAction.POST, 1,"devices",device1.getUuid(), 
"notifications");
+            app.testRequest(ServiceAction.POST, 1, "devices", 
device1.getUuid(), "notifications");
             fail("invalid payload should have been rejected");
         } catch (IllegalArgumentException ex) {
             // ok
@@ -379,7 +379,7 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         app.put("environment", "development");
         app.put("apiKey", API_KEY);
         Entity e = app.testRequest(ServiceAction.POST, 1, "notifiers")
-                .getEntity();
+            .getEntity();
         Notifier notifier2 = app.getEntityManager().get(e.getUuid(), 
Notifier.class);
 
         payloads.clear();
@@ -393,14 +393,14 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
 
         app.clear();
         app.put("payloads", payloads);
-        app.put("debug",true);
+        app.put("debug", true);
 
         try {
-            app.testRequest(ServiceAction.POST, 1, 
"devices",device1.getUuid(),"notifications");
+            app.testRequest(ServiceAction.POST, 1, "devices", 
device1.getUuid(), "notifications");
             fail("invalid payload should have been rejected");
         } catch (Exception ex) {
             assertEquals("java.lang.IllegalArgumentException: GCM payloads 
must be 4096 characters or less",
-                    ex.getMessage());
+                ex.getMessage());
             // ok
         }
     }
@@ -420,10 +420,10 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
 
         // create push notification
-        Entity e = app.testRequest(ServiceAction.POST, 1, 
"devices",badDevice.getUuid(),"notifications")
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", 
badDevice.getUuid(), "notifications")
             .getEntity();
 
         // validate notification  was created successfully
@@ -448,7 +448,7 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
     @Test
     public void createGoogleNotifierWithBadAPIKey() throws Exception {
 
-        final String badKey = API_KEY+"bad";
+        final String badKey = API_KEY + "bad";
 
         // create notifier with bad API key
         app.clear();
@@ -457,25 +457,25 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         app.put("environment", "development");
         app.put("apiKey", badKey);
 
-        try{
+        try {
             notifier = (Notifier) app
                 .testRequest(ServiceAction.POST, 1, "notifiers").getEntity()
                 .toTypedEntity();
-        }catch(InvalidRequestException e){
+        } catch (InvalidRequestException e) {
             assertEquals(Constants.ERROR_INVALID_REGISTRATION, 
e.getDescription());
         }
 
     }
 
     @Test
-    public void sendNotificationWithBadAPIKey() throws Exception{
-        final String badKey = API_KEY+"bad";
+    public void sendNotificationWithBadAPIKey() throws Exception {
+        final String badKey = API_KEY + "bad";
 
         // update an existing notifier with a bad API key
         app.clear();
         app.put("apiKey", badKey);
         notifier = (Notifier) app
-            .testRequest(ServiceAction.PUT, 1, 
"notifiers",notifier.getUuid()).getEntity()
+            .testRequest(ServiceAction.PUT, 1, "notifiers", 
notifier.getUuid()).getEntity()
             .toTypedEntity();
 
         // create notification payload
@@ -485,11 +485,11 @@ public class NotificationsServiceIT extends 
AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
 
         // create notification
-        Entity e = app.testRequest(ServiceAction.POST, 
1,"devices",device1.getUuid(), "notifications")
-                .getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", 
device1.getUuid(), "notifications")
+            .getEntity();
 
 
         // validate notification  was created successfully

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/lib/entities.js
----------------------------------------------------------------------
diff --git a/tests/integration/lib/entities.js 
b/tests/integration/lib/entities.js
index b17fc74..e941d6f 100644
--- a/tests/integration/lib/entities.js
+++ b/tests/integration/lib/entities.js
@@ -117,7 +117,7 @@ function deleteAllEntities(collection, cb) {
                     deleteAllEntities(collection, function(e) {
                         cb(e);
                     });
-                }, 600); // Mandatory, since it seems to not retrieve entities 
if you make a request in < 600ms
+                }, 100); // add some delay
             });
         } else {
             cb();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/lib/notifications.js
----------------------------------------------------------------------
diff --git a/tests/integration/lib/notifications.js 
b/tests/integration/lib/notifications.js
index dd04864..f046bfd 100644
--- a/tests/integration/lib/notifications.js
+++ b/tests/integration/lib/notifications.js
@@ -28,6 +28,7 @@ module.exports.send = function(path, payload, cb) {
         url: urls.appendOrgCredentials(urls.getAppUrl() + path + 
"/notifications"),
         json: payload
     }, function(err, response, body) {
+        //console.log(JSON.stringify(body, null, 2));
         var error = responseLib.getError(err, response);
         cb(error, error ? null : body.entities.pop());
     });

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/lib/notifiers.js
----------------------------------------------------------------------
diff --git a/tests/integration/lib/notifiers.js 
b/tests/integration/lib/notifiers.js
new file mode 100644
index 0000000..33f46cb
--- /dev/null
+++ b/tests/integration/lib/notifiers.js
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+var request = require("request");
+var urls = require("./urls");
+var responseLib = require("./response");
+module.exports = {};
+
+
+module.exports.add = function(notifier, cb) {
+    request.put({
+        url: urls.appendOrgCredentials(urls.getAppUrl() + "notifiers/" + 
notifier.name),
+        json: notifier
+    }, function(err, response, body) {
+        var error = responseLib.getError(err, response);
+        cb(error, error ? null : body.entities.pop());
+    });
+};
+
+
+module.exports.get = function(notifierUUID, cb) {
+    request.get(urls.appendOrgCredentials(urls.getAppUrl() + "notifiers/" + 
notifierUUID), function(err, response, body) {
+        var json = JSON.parse(body);
+        var error = response.statusCode === 404 ? null : 
responseLib.getError(err, response);
+        cb(error, error ? null : response.statusCode === 404 ? null : 
json.entities.pop());
+    })
+};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/test/groups/groups.js
----------------------------------------------------------------------
diff --git a/tests/integration/test/groups/groups.js 
b/tests/integration/test/groups/groups.js
index b822661..b56a11c 100644
--- a/tests/integration/test/groups/groups.js
+++ b/tests/integration/test/groups/groups.js
@@ -1,3 +1,6 @@
+/**
+ * Created by russo on 2/4/16.
+ */
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -34,7 +37,7 @@ module.exports = {
         var username = "groupuser";
         var password = "password";
         var usersArray = [];
-        for (var i = 1; i <= 5; i++) {
+        for (var i = 0; i < 5; i++) {
             usersArray.push({
                 "username": username + "-" + i,
                 "password": password,
@@ -46,7 +49,7 @@ module.exports = {
         // build devices
         var name = "device";
         var devicesArray = [];
-        for (var j = 1; j <= 5; j++) {
+        for (var j = 0; j < 5; j++) {
             devicesArray.push({
                 "name": name + "-" + j,
                 "gcm.notifier.id": DEVICE_TOKEN
@@ -55,7 +58,7 @@ module.exports = {
 
 
         describe("users", function () {
-            it("should create some devices", function (done) {
+            it("should create some users", function (done) {
                 this.slow(2000);
                 async.each(usersArray, function (user, cb) {
                     users.add(user, function (err, user) {
@@ -74,51 +77,6 @@ module.exports = {
         });
 
 
-        describe("devices", function () {
-            it("should create some devices", function (done) {
-                this.slow(2000);
-                async.each(devicesArray, function (device, cb) {
-                    devices.add(device, function (err, device) {
-                        should(err).be.null;
-                        device.should.not.be.null;
-                        cb(err, device);
-                    });
-
-                }, function (err) {
-
-                    done()
-
-                });
-
-            })
-
-        });
-
-
-        describe("user<->devices", function () {
-            it("should connect devices to users", function (done) {
-                this.slow(2000);
-                async.eachSeries(usersArray, function (user, cb) {
-                    async.each(devicesArray, function (device, cb) {
-                        connections.connect("users", user.username, "devices", 
device.name, null, function (err) {
-                            cb(err, device);
-                        });
-                    });
-                    cb(null);
-
-                }, function (err) {
-
-                    if (err) {
-                        console.log("error adding users " + err);
-                    }
-                    done();
-                });
-
-            })
-
-        });
-
-
         describe("groups", function () {
             it("should create some groups", function (done) {
                 this.slow(2000);
@@ -130,7 +88,6 @@ module.exports = {
                     path: "group2"
                 };
 
-                console.log("        creating some groups");
                 groups.add(group1, function (err) {
                     if (err) {
                         console.log("failed to create " + "group1:" + err);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/test/main.js
----------------------------------------------------------------------
diff --git a/tests/integration/test/main.js b/tests/integration/test/main.js
index 5833607..4d8341c 100644
--- a/tests/integration/test/main.js
+++ b/tests/integration/test/main.js
@@ -57,8 +57,11 @@ describe("** Usergrid REST Integration Tests **", function() 
{
     describe("groups", function() {
         require("./groups/groups.js").test();
     });
+    describe("notifications", function() {
+        require("./notifications/notifications.js").test();
+    });
     after(function(done) {
-        this.timeout(40000);
+        this.timeout(180000);
         console.log("    teardown");
         teardown.do(function(err) {
             should(err).be.null;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/test/notifications/notifications.js
----------------------------------------------------------------------
diff --git a/tests/integration/test/notifications/notifications.js 
b/tests/integration/test/notifications/notifications.js
new file mode 100644
index 0000000..07e7642
--- /dev/null
+++ b/tests/integration/test/notifications/notifications.js
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+var should = require("should");
+var uuid = require("uuid");
+var users = require("../../lib/users");
+var devices = require("../../lib/devices");
+var groups = require("../../lib/groups");
+var notifiers = require("../../lib/notifiers");
+var notifications = require("../../lib/notifications");
+var connections = require("../../lib/connections");
+var async = require('async');
+
+var GOOGLE_API_KEY = "AIzaSyCIH_7WC0mOqBGMOXyQnFgrBpOePgHvQJM";
+var ANDROID_DEVICE_TOKEN = 
"APA91bGxRGnMK8tKgVPzSlxtCFvwSVqx0xEPjA06sBmiK0kQsiwUt6ipSYF0iPRHyUgpX" +
+    
"le0P8OlRWJADkQrcN7yxG4pLMg1CVmrqDu8tfSe63mZ-MRU2IW0cOhmosqzC9trl33moS3OvT7qjDjkP4Qq8LYdwwYC5A";
+
+module.exports = {
+    test: function () {
+
+        var username = "notificationuser";
+        var password = "password";
+        var usersArray = [];
+        for (var i = 0; i < 5; i++) {
+            usersArray.push({
+                "username": username + "-" + i,
+                "password": password,
+                "name": username + "-" + i,
+                "email": username + "-" + i + "@uge2e.com",
+                "number": i
+            });
+        }
+
+        // build devices
+        var deviceName = "notificationdevice";
+        var devicesArray = [];
+        for (var j = 0; j < 5; j++) {
+            devicesArray.push({
+                "name": deviceName + "-" + j,
+                "gcm.notifier.id": ANDROID_DEVICE_TOKEN,
+                "number": i
+            });
+        }
+
+        var notifiersArray = [];
+        var notifier = {
+            name: "gcm",
+            provider: "google",
+            environment: "environment",
+            apiKey: GOOGLE_API_KEY
+        };
+        notifiersArray.push(notifier);
+
+
+        var gcmNotification = {
+
+            payloads: {
+                gcm: "Usergrid Integration Push Test - GCM"
+            }
+        };
+
+
+        describe("notifiers -> GCM", function () {
+            it("should create a GCM notifier", function (done) {
+                this.slow(5000);
+                async.each(notifiersArray, function (notifier, cb) {
+                    notifiers.add(notifier, function (err, notifier) {
+                        should(err).be.null;
+                        notifier.should.not.be.null;
+                        cb(err, notifier);
+                    });
+                }, function (err) {
+
+                    done();
+
+                });
+
+            })
+
+        });
+
+        describe("users", function () {
+            it("should create some users", function (done) {
+                this.slow(2000);
+                async.each(usersArray, function (user, cb) {
+                    users.add(user, function (err, user) {
+                        should(err).be.null;
+                        user.should.not.be.null;
+                        cb(err, user);
+                    });
+                }, function (err) {
+
+                    done();
+
+                });
+
+            })
+
+        });
+
+
+        describe("devices", function () {
+            it("should create some devices", function (done) {
+                this.slow(2000);
+                async.each(devicesArray, function (device, cb) {
+                    devices.add(device, function (err, device) {
+                        should(err).be.null;
+                        device.should.not.be.null;
+                        cb(err, device);
+                    });
+
+                }, function (err) {
+
+                    done()
+
+                });
+
+            })
+
+        });
+
+
+        describe("user<->devices", function () {
+            it("should connect devices to users", function (done) {
+                this.slow(5000);
+                async.eachSeries(usersArray, function (user, cb) {
+                    connections.connect("users", user.username, "devices", 
devicesArray[user.number].name,
+                        null, function (err) {
+                            cb(err);
+                    });
+                }, function (err) {
+
+                    if (err) {
+                        console.log("error adding users " + err);
+                    }
+                    done();
+                });
+
+            })
+
+        });
+
+
+        describe("groups", function () {
+            it("should create some groups", function (done) {
+                this.slow(2000);
+                var group1 = {
+                    path: "notificationgroup1"
+                };
+
+                var group2 = {
+                    path: "notificationgroup2"
+                };
+
+                async.series([
+                    function (cb) {
+
+                        groups.add(group1, function (err) {
+                            if (err) {
+                                console.log("failed to create " + 
"notificationgroup1:" + err);
+                            }
+                            cb(err);
+
+                        });
+                    }, function (cb) {
+
+                        groups.add(group2, function (err) {
+                            if (err) {
+                                console.log("failed to create " + 
"notificationgroup2:" + err);
+                            }
+                            cb(err);
+                        });
+
+
+                    }
+                ], function (err, results) {
+
+                    done();
+
+                });
+
+
+            })
+
+        });
+
+
+        describe("groups<->users", function () {
+            it("should connect users to groups", function (done) {
+                this.slow(2000);
+                async.each(usersArray, function (user, cb) {
+
+                    async.series([
+                        function (cb) {
+                            connections.connect("groups", 
"notificationgroup1", "users", user.username, null,
+                                function (err) {
+                                    cb(err, user);
+                            });
+
+                        },
+                        function (cb) {
+                            connections.connect("groups", 
"notificationgroup2", "users", user.username, null,
+                                function (err) {
+                                    cb(err, user);
+
+                            });
+                        }
+
+                    ], function (err, results) {
+
+                        cb(err);
+
+                    });
+
+                }, function (err) {
+                    done();
+                });
+
+            })
+
+        });
+
+
+        // SEND NOTIFICATIONS HERE AND VALIDATE THE NUMBER OF NOTIFICATIONS 
SENT ARE ACCURATE FOR QUERY
+
+        describe("notification -> user - direct path", function () {
+            it("should send a single notification to a user", function (done) {
+                this.timeout(5000)
+                this.slow(5000);
+                setTimeout(function () {
+
+                    notifications.send("users/" + usersArray[1].username, 
gcmNotification,
+                        function (err, notification) {
+                            should(err).be.null;
+                            notification.should.not.be.null;
+                            notification.expectedCount.should.be.equal(1);
+                            done();
+
+                    });
+
+                }, 1000)
+
+
+            })
+
+        });
+
+        describe("notification -> user - via matrix query", function () {
+            it("should send a single notification to a user", function (done) {
+                this.timeout(5000)
+                this.slow(5000);
+
+                setTimeout(function () {
+
+                    notifications.send("users;ql=select * where username = 
'notificationuser-0'", gcmNotification,
+                        function (err, notification) {
+                            should(err).be.null;
+                            notification.should.not.be.null;
+                            notification.expectedCount.should.be.equal(1);
+                            done();
+
+                    });
+
+                }, 1000);
+
+
+            })
+
+        });
+
+        describe("notification -> groups - via matrix query", function () {
+            it("should send a single notification to groups with the same 
users", function (done) {
+                this.timeout(5000)
+                this.slow(5000);
+                setTimeout(function () {
+
+                    notifications.send("groups;ql=select * where path = 
'notificationgroup1' " +
+                        "or path = 'notificationgroup2'", gcmNotification, 
function (err, notification) {
+
+                            should(err).be.null;
+                            notification.should.not.be.null;
+                            // we set up 2 groups of the same 5 users.  if 
duplicate filtering is working,
+                            // we'll only have 5 expected
+                            notification.expectedCount.should.be.equal(5);
+                            done();
+
+                    });
+
+                }, 1000);
+
+            })
+
+        });
+
+
+    }
+};

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/test/teardown.js
----------------------------------------------------------------------
diff --git a/tests/integration/test/teardown.js 
b/tests/integration/test/teardown.js
index d1031ff..24a9ddd 100644
--- a/tests/integration/test/teardown.js
+++ b/tests/integration/test/teardown.js
@@ -65,16 +65,32 @@ module.exports = {
                     })
                 },
                 function(cb) {
+                    entities.deleteAll('notifiers', function(err, body) {
+                        should(err).be.null;
+                        
body.entities.should.be.an.instanceOf(Array).and.have.lengthOf(0);
+                        body.count.should.equal(0);
+                        cb(err);
+                    })
+                },
+                function(cb) {
                     entities.deleteAll('notifications', function(err, body) {
                         should(err).be.null;
                         
body.entities.should.be.an.instanceOf(Array).and.have.lengthOf(0);
                         body.count.should.equal(0);
                         cb(err);
                     })
+                },
+                function(cb) {
+                    entities.deleteAll('receipts', function(err, body) {
+                        should(err).be.null;
+                        
body.entities.should.be.an.instanceOf(Array).and.have.lengthOf(0);
+                        body.count.should.equal(0);
+                        cb(err);
+                    })
                 }
             ],
             function(err, data) {
                 cb(err);
             });
     }
-}
\ No newline at end of file
+};
\ No newline at end of file

Reply via email to