Back to Schedulers.io()
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/949f71be Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/949f71be Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/949f71be Branch: refs/heads/master Commit: 949f71be2b2f8dd6352d82195a72f8c3f31c32f1 Parents: e76e65d Author: Michael Russo <mru...@apigee.com> Authored: Wed Apr 20 19:42:44 2016 -0700 Committer: Michael Russo <mru...@apigee.com> Committed: Wed Apr 20 19:42:44 2016 -0700 ---------------------------------------------------------------------- .../notifications/impl/ApplicationQueueManagerImpl.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/949f71be/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index fa8c8a2..44ccf2b 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -64,7 +64,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { - private final Scheduler scheduler; + //private final Scheduler scheduler; @@ -79,8 +79,10 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { this.queueName = getQueueNames(properties); this.queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue"); this.sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send"); + this.concurrencyFactor = Integer.valueOf(System.getProperty(PUSH_PROCESSING_CONCURRENCY_PROP, "50")); + /** int maxAsyncThreads; int workerQueueSize; @@ -88,7 +90,6 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { maxAsyncThreads = Integer.valueOf(System.getProperty(PUSH_PROCESSING_MAXTHREADS_PROP, "200")); workerQueueSize = Integer.valueOf(System.getProperty(PUSH_PROCESSING_QUEUESIZE_PROP, "2000")); - this.concurrencyFactor = Integer.valueOf(System.getProperty(PUSH_PROCESSING_CONCURRENCY_PROP, "50")); } catch (Exception e){ @@ -104,7 +105,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { this.scheduler = Schedulers.from(TaskExecutorFactory .createTaskExecutor( "push-device-io", maxAsyncThreads, workerQueueSize, TaskExecutorFactory.RejectionAction.CALLERRUNS )); - + **/ } @@ -308,7 +309,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { }) .map(sendMessageFunction) - .subscribeOn(scheduler); + .subscribeOn(Schedulers.io()); }, concurrencyFactor) .distinct( queueMessage -> { @@ -374,7 +375,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { }); - processMessagesObservable.subscribeOn(scheduler).subscribe(); // fire the queuing into the background + processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background }