Fix issues with notification segmenting to ensure it's more efficient.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c1375bf6 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c1375bf6 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c1375bf6 Branch: refs/heads/release-2.1.1 Commit: c1375bf6caa60f36038571c3cd0c2a0b719e36f5 Parents: 8e4d7ee Author: Michael Russo <mru...@apigee.com> Authored: Tue Apr 12 00:25:14 2016 +0200 Committer: Michael Russo <mru...@apigee.com> Committed: Tue Apr 12 00:25:14 2016 +0200 ---------------------------------------------------------------------- .../main/resources/usergrid-default.properties | 14 +++++ .../org/apache/usergrid/persistence/Query.java | 2 + .../services/notifications/QueueListener.java | 41 ++++++------ .../impl/ApplicationQueueManagerImpl.java | 65 +++++++++++--------- .../usergrid/services/queues/QueueListener.java | 2 +- .../apns/NotificationsServiceIT.java | 6 +- .../gcm/NotificationsServiceIT.java | 1 - 7 files changed, 72 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/config/src/main/resources/usergrid-default.properties ---------------------------------------------------------------------- diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties index 8b0174c..5cd7c7a 100644 --- a/stack/config/src/main/resources/usergrid-default.properties +++ b/stack/config/src/main/resources/usergrid-default.properties @@ -447,6 +447,20 @@ usergrid.scheduler.job.workers=4 usergrid.scheduler.job.queueName=/jobs +############################### Usergrid Push Notifications ############################# +# +# Usergrid processes individual push notifications asynchronously using a queue. Below are +# settings that can be used to tune this processing. + + +# Set the number of queue consumers to read from the in-region push notification queue. +# +usergrid.push.worker_count=8 + +# Set the sleep time between queue polling ( in milliseconds) +# +usergrid.push.sleep=100 + ############################### Usergrid Central SSO ############################# http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java index 52e3b4e..150a1b0 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java @@ -61,6 +61,8 @@ public class Query { public static final int DEFAULT_LIMIT = 10; + public static final int MID_LIMIT = 500; + public static final int MAX_LIMIT = 1000; public static final String PROPERTY_UUID = "uuid"; http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java index de9cf06..55d1491 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java @@ -45,10 +45,10 @@ import java.util.concurrent.atomic.AtomicLong; * Singleton listens for notifications queue messages */ public class QueueListener { - public final int MESSAGE_TRANSACTION_TIMEOUT = 25 * 1000; + private final QueueManagerFactory queueManagerFactory; - public long DEFAULT_SLEEP = 5000; + public static long DEFAULT_SLEEP = 100; private static final Logger logger = LoggerFactory.getLogger(QueueListener.class); @@ -61,9 +61,6 @@ public class QueueListener { private Properties properties; - - private ServiceManager svcMgr; - private long sleepWhenNoneFound = 0; private long sleepBetweenRuns = 0; @@ -71,8 +68,8 @@ public class QueueListener { private ExecutorService pool; private List<Future> futures; - public final int MAX_THREADS = 2; - private Integer batchSize = 10; + private static final int PUSH_CONSUMER_MAX_THREADS = 8; + public static final int MAX_TAKE = 10; private String queueName; private int consecutiveCallsToRemoveDevices; @@ -99,15 +96,14 @@ public class QueueListener { try { - sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", ""+sleepBetweenRuns)).longValue(); - sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue(); - batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize))); + sleepBetweenRuns = new Long(properties.getProperty("usergrid.push.sleep", "" + DEFAULT_SLEEP)); + sleepWhenNoneFound = new Long(properties.getProperty("usergrid.push.sleep", "" + DEFAULT_SLEEP)); consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+200)); queueName = ApplicationQueueManagerImpl.getQueueNames(properties); - int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", ""+MAX_THREADS)); + int maxThreads = new Integer(properties.getProperty("usergrid.push.worker_count", ""+PUSH_CONSUMER_MAX_THREADS)); - futures = new ArrayList<Future>(maxThreads); + futures = new ArrayList<>(maxThreads); //create our thread pool based on our threadcount. @@ -144,33 +140,40 @@ public class QueueListener { } private void execute(int threadNumber){ + if(Thread.currentThread().isDaemon()) { Thread.currentThread().setDaemon(true); } + Thread.currentThread().setName(getClass().getSimpleName()+"_PushNotifications-"+threadNumber); final AtomicInteger consecutiveExceptions = new AtomicInteger(); + if (logger.isTraceEnabled()) { logger.trace("QueueListener: Starting execute process."); } + Meter meter = metricsService.getMeter(QueueListener.class, "execute.commit"); com.codahale.metrics.Timer timer = metricsService.getTimer(QueueListener.class, "execute.dequeue"); - svcMgr = smf.getServiceManager(smf.getManagementAppId()); + if (logger.isTraceEnabled()) { logger.trace("getting from queue {} ", queueName); } + QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCAL); QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope); + // run until there are no more active jobs final AtomicLong runCount = new AtomicLong(0); + //cache to retrieve push manager, cached per notifier, so many notifications will get same push manager LoadingCache<UUID, ApplicationQueueManager> queueManagerMap = getQueueManagerCache(queueManager); while ( true ) { Timer.Context timerContext = timer.time(); - rx.Observable.from(queueManager.getMessages(getBatchSize(), ApplicationQueueMessage.class)) - .buffer(getBatchSize()) + rx.Observable.from(queueManager.getMessages(MAX_TAKE, ApplicationQueueMessage.class)) + .buffer(MAX_TAKE) .doOnNext(messages -> { try { @@ -329,12 +332,4 @@ public class QueueListener { pool.shutdownNow(); } - - public void setBatchSize(int batchSize){ - this.batchSize = batchSize; - } - public int getBatchSize(){return batchSize;} - - - } http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index 04e60b7..6c28d2f 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -120,7 +120,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { final UUID appId = em.getApplication().getUuid(); final Map<String, Object> payloads = notification.getPayloads(); - final Func1<EntityRef, ApplicationQueueMessage> sendMessageFunction = deviceRef -> { + final Func1<EntityRef, Optional<ApplicationQueueMessage>> sendMessageFunction = deviceRef -> { try { long now = System.currentTimeMillis(); @@ -145,7 +145,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { if (notifierId == null) { //TODO need to leverage optional here - //return deviceRef; + return Optional.empty(); } ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId); @@ -157,15 +157,14 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { } deviceCount.incrementAndGet(); - return message; + return Optional.of(message); } catch (Exception deviceLoopException) { logger.error("Failed to add device", deviceLoopException); errorMessages.add("Failed to add device: " + deviceRef.getUuid() + ", error:" + deviceLoopException); - //TODO need an optional here - return new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), "test", "test"); + return Optional.empty(); } }; @@ -185,13 +184,21 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { applicationQueueMessages.forEach( message -> { try { - - qm.sendMessage( message ); - queueMeter.mark(); + if(message.isPresent()){ + qm.sendMessage( message.get() ); + queueMeter.mark(); + } } catch (IOException e) { - logger.error("Unable to queue notification for notification UUID {} and device UUID {} ", - message.getNotificationId(), message.getDeviceId()); + + if(message.isPresent()){ + logger.error("Unable to queue notification for notification UUID {} and device UUID {} ", + message.get().getNotificationId(), message.get().getDeviceId()); + } + else{ + logger.error("Unable to queue notification as it's not present when trying to send to queue"); + } + } }); @@ -200,7 +207,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { }) .doOnError(throwable -> logger.error("Failed while trying to send notification", throwable)); - processMessagesObservable.toBlocking(); // let this run and block the async thread, messages are queued + processMessagesObservable.toBlocking().last(); // let this run and block the async thread, messages are queued } @@ -513,6 +520,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { List<EntityRef> devices = new ArrayList<>(); + final int LIMIT = Query.MID_LIMIT; + try { @@ -529,52 +538,48 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { initial = false; - final List<EntityRef> mydevices = em.getCollection(ref, "devices", start, Query.DEFAULT_LIMIT, + final List<EntityRef> mydevices = em.getCollection(ref, "devices", start, LIMIT, Query.Level.REFS, true).getRefs(); resultSize = mydevices.size(); + if(mydevices.size() > 0){ start = mydevices.get(mydevices.size() - 1 ).getUuid(); } - devices.addAll( mydevices ); - } } else if ("group".equals(ref.getType())) { - //devices = new ArrayList<>(); UUID start = null; boolean initial = true; int resultSize = 0; - while( initial || resultSize >= Query.DEFAULT_LIMIT){ + while( initial || resultSize >= LIMIT){ - initial = false; - final List<EntityRef> myusers = em.getCollection(ref, "users", start, - Query.DEFAULT_LIMIT, Query.Level.REFS, true).getRefs(); + initial = false; + final List<EntityRef> myusers = em.getCollection(ref, "users", start, + LIMIT, Query.Level.REFS, true).getRefs(); - resultSize = myusers.size(); - if(myusers.size() > 0){ - start = myusers.get(myusers.size() - 1 ).getUuid(); - } + resultSize = myusers.size(); + if(myusers.size() > 0){ + start = myusers.get(myusers.size() - 1 ).getUuid(); + } - // don't allow a single user to have more than 100 devices? - for (EntityRef user : myusers) { - devices.addAll( em.getCollection(user, "devices", null, 100, - Query.Level.REFS, true).getRefs() ); + // don't allow a single user to have more than 100 devices? + for (EntityRef user : myusers) { + devices.addAll( em.getCollection(user, "devices", null, 100, + Query.Level.REFS, true).getRefs() ); - } + } } - - } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java index 5895d38..9d95d87 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java @@ -110,7 +110,7 @@ public abstract class QueueListener { try { sleepBetweenRuns = new Long(properties.getProperty("usergrid.queues.listener.sleep.between", ""+sleepBetweenRuns)).longValue(); sleepWhenNoneFound = new Long(properties.getProperty("usergrid.queues.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue(); - batchSize = new Integer(properties.getProperty("usergrid.queues.listener.batchSize", (""+batchSize))); + batchSize = new Integer(properties.getProperty("usergrid.queues.listener.MAX_TAKE", (""+batchSize))); consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.queues.inactive.interval", ""+200)); queueName = getQueueName(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java ---------------------------------------------------------------------- diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java index dea4e49..3923827 100644 --- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java +++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java @@ -129,7 +129,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { public void after() throws Exception { if(listener != null) { listener.stop(); - listener = null; } } @@ -683,8 +682,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { final int NUM_DEVICES = 50; // perform push // - int oldBatchSize = listener.getBatchSize(); - listener.setBatchSize(10); + int oldBatchSize = QueueListener.MAX_TAKE; app.clear(); app.put("name", UUID.randomUUID().toString()); @@ -724,7 +722,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { try { notificationWaitForComplete(notification); } finally { - listener.setBatchSize( oldBatchSize); + //noop } // check receipts // http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java ---------------------------------------------------------------------- diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java index 65cc54a..1c7915a 100644 --- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java +++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java @@ -106,7 +106,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT { public void after() { if (listener != null) { listener.stop(); - listener = null; } }