Fix scheduler.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/938bef0d Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/938bef0d Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/938bef0d Branch: refs/heads/master Commit: 938bef0d02c6f029319e4ea1632c13c7bf74a447 Parents: df9abc4 Author: Michael Russo <mru...@apigee.com> Authored: Wed Apr 20 18:24:59 2016 -0700 Committer: Michael Russo <mru...@apigee.com> Committed: Wed Apr 20 18:24:59 2016 -0700 ---------------------------------------------------------------------- .../impl/ApplicationQueueManagerImpl.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/938bef0d/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index 778307c..4b2612f 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -29,6 +29,7 @@ import org.apache.usergrid.services.notifications.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; +import rx.Scheduler; import rx.Subscriber; import rx.functions.Func1; import rx.schedulers.Schedulers; @@ -62,7 +63,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { - //private final ExecutorService asyncExecutor; + private final Scheduler scheduler; @@ -78,7 +79,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { this.queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue"); this.sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send"); - /** + int maxAsyncThreads; int workerQueueSize; @@ -99,11 +100,11 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks - this.asyncExecutor = TaskExecutorFactory + this.scheduler = Schedulers.from(TaskExecutorFactory .createTaskExecutor( "push-device-io", maxAsyncThreads, workerQueueSize, - TaskExecutorFactory.RejectionAction.CALLERRUNS ); + TaskExecutorFactory.RejectionAction.CALLERRUNS )); + - **/ } private boolean scheduleQueueJob(Notification notification) throws Exception { @@ -306,7 +307,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { }) .map(sendMessageFunction) - .subscribeOn(Schedulers.io()); + .subscribeOn(scheduler); }, concurrencyFactor) .distinct( queueMessage -> { @@ -314,7 +315,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { if(queueMessage.isPresent()) { return queueMessage.get().getNotificationId(); } - + return queueMessage; // this will always be distinct, default handling for the Optional.empty() case } ) @@ -372,7 +373,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { }); - processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background + processMessagesObservable.subscribeOn(scheduler).subscribe(); // fire the queuing into the background }