Temporarily disable notification counters and back to Schedulers.io()
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/df9abc4d Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/df9abc4d Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/df9abc4d Branch: refs/heads/master Commit: df9abc4d0c16af25dbfd75ea544bc9953b7addc7 Parents: 81eb251 Author: Michael Russo <mru...@apigee.com> Authored: Wed Apr 20 18:01:57 2016 -0700 Committer: Michael Russo <mru...@apigee.com> Committed: Wed Apr 20 18:01:57 2016 -0700 ---------------------------------------------------------------------- .../services/notifications/TaskManager.java | 4 +- .../impl/ApplicationQueueManagerImpl.java | 58 ++++++++++++-------- 2 files changed, 36 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/df9abc4d/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java index e908c3b..870cae9 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java @@ -62,7 +62,7 @@ public class TaskManager { //random date and time for format - incrementNotificationCounter( "completed" ); + //incrementNotificationCounter( "completed" ); EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID); @@ -100,7 +100,7 @@ public class TaskManager { try { - incrementNotificationCounter( "failed" ); + //incrementNotificationCounter( "failed" ); if (logger.isDebugEnabled()) { logger.debug("Notification {} for device {} got error {}", notification.getUuid(), deviceUUID, code); http://git-wip-us.apache.org/repos/asf/usergrid/blob/df9abc4d/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index 5254fd6..778307c 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -62,7 +62,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { - private final ExecutorService asyncExecutor; + //private final ExecutorService asyncExecutor; @@ -78,6 +78,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { this.queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue"); this.sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send"); + /** int maxAsyncThreads; int workerQueueSize; @@ -102,7 +103,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { .createTaskExecutor( "push-device-io", maxAsyncThreads, workerQueueSize, TaskExecutorFactory.RejectionAction.CALLERRUNS ); - + **/ } private boolean scheduleQueueJob(Notification notification) throws Exception { @@ -269,7 +270,6 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { return Observable.from(entities); }) - .distinct( deviceRef -> deviceRef.getUuid()) .filter( device -> { if(logger.isTraceEnabled()) { @@ -306,37 +306,47 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { }) .map(sendMessageFunction) - .doOnNext( message -> { - try { + .subscribeOn(Schedulers.io()); - if(message.isPresent()){ + }, concurrencyFactor) + .distinct( queueMessage -> { - if(logger.isTraceEnabled()) { - logger.trace("Queueing notification message for device: {}", message.get().getDeviceId()); - } - qm.sendMessage( message.get() ); - queueMeter.mark(); - } + if(queueMessage.isPresent()) { + return queueMessage.get().getNotificationId(); + } + + return queueMessage; // this will always be distinct, default handling for the Optional.empty() case - } catch (IOException e) { + } ) + .doOnNext( message -> { + try { - if(message.isPresent()){ - logger.error("Unable to queue notification for notification UUID {} and device UUID {} ", - message.get().getNotificationId(), message.get().getDeviceId()); - } - else{ - logger.error("Unable to queue notification as it's not present when trying to send to queue"); - } + if(message.isPresent()){ + if(logger.isTraceEnabled()) { + logger.trace("Queueing notification message for device: {}", message.get().getDeviceId()); } + qm.sendMessage( message.get() ); + queueMeter.mark(); + } + } catch (IOException e) { - }).subscribeOn(Schedulers.from(asyncExecutor)); + if(message.isPresent()){ + logger.error("Unable to queue notification for notification UUID {} and device UUID {} ", + message.get().getNotificationId(), message.get().getDeviceId()); + } + else{ + logger.error("Unable to queue notification as it's not present when trying to send to queue"); + } - }, concurrencyFactor) + } + + + }) .doOnError(throwable -> { - logger.error("Error while processing devices for notification : {}", notification.getUuid()); + logger.error("Error while processing devices for notification : {}, error: {}", notification.getUuid(), throwable.getMessage()); notification.setProcessingFinished(-1L); notification.setDeviceProcessedCount(deviceCount.get()); logger.warn("Partial notification. Only {} devices processed for notification {}", @@ -362,7 +372,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { }); - processMessagesObservable.subscribeOn(Schedulers.from(asyncExecutor)).subscribe(); // fire the queuing into the background + processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background }