Re-work caching of push notification queue managers such that producing and consuming leverage the same cache.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d16f4c17 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d16f4c17 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d16f4c17 Branch: refs/heads/asf-site Commit: d16f4c17c7b4eda01a8eddf2020139ec3898521d Parents: c07cdb5 Author: Michael Russo <mru...@apigee.com> Authored: Sat Apr 30 18:14:40 2016 +0800 Committer: Michael Russo <mru...@apigee.com> Committed: Sat Apr 30 18:14:40 2016 +0800 ---------------------------------------------------------------------- .../main/resources/usergrid-default.properties | 10 +- .../shard/impl/ShardGroupCompactionImpl.java | 18 ++- .../queue/impl/QueueManagerFactoryImpl.java | 27 ++-- .../ApplicationQueueManagerCache.java | 143 +++++++++++++++++++ .../notifications/NotificationsService.java | 6 +- .../services/notifications/QueueListener.java | 58 ++------ .../impl/ApplicationQueueManagerImpl.java | 8 +- 7 files changed, 197 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/config/src/main/resources/usergrid-default.properties ---------------------------------------------------------------------- diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties index 5cd7c7a..4f57cdd 100644 --- a/stack/config/src/main/resources/usergrid-default.properties +++ b/stack/config/src/main/resources/usergrid-default.properties @@ -455,12 +455,20 @@ usergrid.scheduler.job.queueName=/jobs # Set the number of queue consumers to read from the in-region push notification queue. # -usergrid.push.worker_count=8 +usergrid.push.worker_count=2 # Set the sleep time between queue polling ( in milliseconds) # usergrid.push.sleep=100 +# This setting determines the inmemory cache TTL (in minutes) for push notifications queue managers. +# +usergrid.push.queuemanager.cache.time-to-live=10 + +# This setting determines the inmemory cache size (# elements) for push notifications queue managers. +# +usergrid.push.queuemanager.cache.size=200 + ############################### Usergrid Central SSO ############################# http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java index b26ee46..0853adb 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java @@ -303,9 +303,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { logger.trace("Finished compacting {} shards and moved {} edges", sourceShards, totalEdgeCount); } - logger.info("Finished compacting {} shards and moved {} edges", sourceShards, totalEdgeCount); - - resultBuilder.withCopiedEdges( totalEdgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard ); /** @@ -351,8 +348,9 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true ); compactedShard.setShardEnd(targetShard.getShardEnd()); - logger.info( "Shard has been fully compacted. Marking shard {} as compacted in Cassandra", compactedShard ); - + if(logger.isTraceEnabled()) { + logger.trace("Shard has been fully compacted. Marking shard {} as compacted in Cassandra", compactedShard); + } final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta ); try { @@ -402,7 +400,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { } catch ( RejectedExecutionException ree ) { - //ignore, if this happens we don't care, we're saturated, we can check later + // ignore, if this happens we don't care, we're saturated, we can check later logger.info( "Rejected audit for shard of scope {} edge, meta {} and group {}", scope, edgeMeta, group ); return Futures.immediateFuture( AuditResult.NOT_CHECKED ); @@ -503,8 +501,10 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { */ try { CompactionResult result = compact( scope, edgeMeta, group ); - logger.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group {} is {}", - scope, edgeMeta, group, result ); + if(logger.isTraceEnabled()) { + logger.trace("Compaction result for compaction of scope {} with edge meta data of {} and shard group {} is {}", + scope, edgeMeta, group, result); + } } finally { shardCompactionTaskTracker.complete( scope, edgeMeta, group ); @@ -535,8 +535,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { ShardEntryGroup group ) { final Long hash = doHash( scope, edgeMeta, group ).hash().asLong(); final Boolean returned = runningTasks.putIfAbsent( hash, TRUE ); - //logger.info("hash components are app: {}, edgeMeta: {}, group: {}", scope.getApplication(), edgeMeta, group); - //logger.info("checking hash value of: {}, already started: {}", hash, returned ); /** * Someone already put the value http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java index de9cac5..93b2fb2 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java @@ -1,21 +1,18 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. The ASF licenses this file to You - * * under the Apache License, Version 2.0 (the "License"); you may not - * * use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. For additional information regarding - * * copyright in this work, please see the NOTICE file in the top level - * * directory of this distribution. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.usergrid.persistence.queue.impl; http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java new file mode 100644 index 0000000..555e495 --- /dev/null +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.usergrid.services.notifications; + +import com.google.common.cache.*; +import com.google.inject.Singleton; +import org.apache.usergrid.persistence.EntityManager; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.queue.QueueManager; +import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + + +@Singleton +public class ApplicationQueueManagerCache{ + + private static final Logger logger = LoggerFactory.getLogger( ApplicationQueueManagerCache.class ); + + + private final Cache<UUID, ApplicationQueueManager> cache; + + private static final String CACHE_TTL_PROP = "usergrid.push.queuemanager.cache.time-to-live"; + private static final String CACHE_MAX_SIZE_PROP = "usergrid.push.queuemanager.cache.size"; + + public ApplicationQueueManagerCache(){ + + // set a smaller ttl + long ttl = 10; + int configuredMaxSize; + + try{ + ttl = Integer.parseInt(System.getProperty(CACHE_TTL_PROP)); + } catch (NumberFormatException e){ + // already defaulted to 1 above + } + + try{ + configuredMaxSize = Integer.parseInt(System.getProperty(CACHE_MAX_SIZE_PROP)); + } catch (NumberFormatException e){ + configuredMaxSize = 200; + } + + this.cache = CacheBuilder.newBuilder() + .maximumSize(Math.max(1000,configuredMaxSize)) + .expireAfterWrite(ttl, TimeUnit.MINUTES) + .removalListener(new RemovalListener<UUID, ApplicationQueueManager>() { + @Override + public void onRemoval( + RemovalNotification<UUID, ApplicationQueueManager> queueManagerNotifiication) { + try { + if ( queueManagerNotifiication.getValue() != null) { + queueManagerNotifiication.getValue().stop(); + } + } catch (Exception ie) { + logger.error("Failed to shutdown push queue manager from cache", ie.getMessage()); + } + } + }).build(); + + } + + public void put(UUID key, ApplicationQueueManager value){ + + cache.put(key, value); + } + + public ConcurrentMap<UUID, ApplicationQueueManager> asMap(){ + + return cache.asMap(); + } + + public ApplicationQueueManager get(UUID key){ + return cache.getIfPresent(key); + } + + public void invalidate(UUID key){ + cache.invalidate(key); + } + + public void invalidateAll(){ + cache.invalidateAll(); + } + + + public ApplicationQueueManager getApplicationQueueManager( final EntityManager entityManager, + final QueueManager queueManager, + final JobScheduler jobScheduler, + final MetricsFactory metricsService, + final Properties properties ) { + + + ApplicationQueueManager manager = cache.getIfPresent(entityManager.getApplicationId()); + + if(manager != null){ + if(logger.isTraceEnabled()){ + logger.trace("Returning push queue manager from cache for application: {}", entityManager.getApplicationId()); + } + return manager; + + }else { + if(logger.isTraceEnabled()) { + logger.trace("Push queue manager not found in cache, loading for application: {}", entityManager.getApplicationId()); + } + manager = new ApplicationQueueManagerImpl( + jobScheduler, + entityManager, + queueManager, + metricsService, + properties + ); + + cache.put(entityManager.getApplicationId(), manager); + + return manager; + + + } + + + } + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java index 65425d7..907638e 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java @@ -77,6 +77,7 @@ public class NotificationsService extends AbstractCollectionService { private ServiceManagerFactory smf; private EntityManagerFactory emf; private QueueManagerFactory queueManagerFactory; + private ApplicationQueueManagerCache applicationQueueManagerCache; public NotificationsService() { if (logger.isTraceEnabled()) { @@ -99,7 +100,10 @@ public class NotificationsService extends AbstractCollectionService { QueueScope queueScope = new QueueScopeImpl( name, QueueScope.RegionImplementation.LOCAL); queueManagerFactory = getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class); QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope); - notificationQueueManager = new ApplicationQueueManagerImpl(jobScheduler,em,queueManager,metricsService,props); + applicationQueueManagerCache = getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class); + notificationQueueManager = applicationQueueManagerCache + .getApplicationQueueManager(em,queueManager, jobScheduler, metricsService ,props); + gracePeriod = JobScheduler.SCHEDULER_GRACE_PERIOD; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java index 55d1491..478d5ed 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java @@ -57,6 +57,7 @@ public class QueueListener { private ServiceManagerFactory smf; private EntityManagerFactory emf; + private ApplicationQueueManagerCache applicationQueueManagerCache; private Properties properties; @@ -79,6 +80,8 @@ public class QueueListener { this.emf = emf; this.metricsService = smf.getApplicationContext().getBean( Injector.class ).getInstance(MetricsFactory.class); this.properties = props; + this.applicationQueueManagerCache = smf.getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class); + } /** @@ -86,8 +89,6 @@ public class QueueListener { */ public void start(){ //TODO refactor this into a central component that will start/stop services -// boolean shouldRun = new Boolean(properties.getProperty("usergrid.notifications.listener.run", "false")); - if (logger.isDebugEnabled()) { logger.debug("QueueListener: starting."); @@ -166,9 +167,6 @@ public class QueueListener { // run until there are no more active jobs final AtomicLong runCount = new AtomicLong(0); - //cache to retrieve push manager, cached per notifier, so many notifications will get same push manager - LoadingCache<UUID, ApplicationQueueManager> queueManagerMap = getQueueManagerCache(queueManager); - while ( true ) { Timer.Context timerContext = timer.time(); @@ -207,7 +205,16 @@ public class QueueListener { //send each set of app ids together for (Map.Entry<UUID, List<QueueMessage>> entry : messageMap.entrySet()) { UUID applicationId = entry.getKey(); - ApplicationQueueManager manager = queueManagerMap.get(applicationId); + + ApplicationQueueManager manager = applicationQueueManagerCache + .getApplicationQueueManager( + emf.getEntityManager(applicationId), + queueManager, + new JobScheduler(smf.getServiceManager(applicationId), emf.getEntityManager(applicationId)), + metricsService, + properties + ); + if (logger.isTraceEnabled()) { logger.trace("send batch for app {} of {} messages", entry.getKey(), entry.getValue().size()); } @@ -238,7 +245,7 @@ public class QueueListener { } if(runCount.incrementAndGet() % consecutiveCallsToRemoveDevices == 0){ - for(ApplicationQueueManager applicationQueueManager : queueManagerMap.asMap().values()){ + for(ApplicationQueueManager applicationQueueManager : applicationQueueManagerCache.asMap().values()){ try { applicationQueueManager.asyncCheckForInactiveDevices(); }catch (Exception inactiveDeviceException){ @@ -280,43 +287,6 @@ public class QueueListener { } } - private LoadingCache<UUID, ApplicationQueueManager> getQueueManagerCache(final QueueManager queueManager) { - return CacheBuilder - .newBuilder() - .expireAfterAccess(10, TimeUnit.MINUTES) - .removalListener(new RemovalListener<UUID, ApplicationQueueManager>() { - @Override - public void onRemoval( - RemovalNotification<UUID, ApplicationQueueManager> queueManagerNotifiication) { - try { - queueManagerNotifiication.getValue().stop(); - } catch (Exception ie) { - logger.error("Failed to shutdown from cache", ie); - } - } - }).build(new CacheLoader<UUID, ApplicationQueueManager>() { - @Override - public ApplicationQueueManager load(final UUID applicationId) { - try { - EntityManager entityManager = emf.getEntityManager(applicationId); - ServiceManager serviceManager = smf.getServiceManager(applicationId); - - ApplicationQueueManagerImpl manager = new ApplicationQueueManagerImpl( - new JobScheduler(serviceManager, entityManager), - entityManager, - queueManager, - metricsService, - properties - ); - return manager; - } catch (Exception e) { - logger.error("Could not instantiate queue manager", e); - return null; - } - } - }); - } - public void stop(){ if (logger.isDebugEnabled()) { logger.debug("stop processes"); http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index 80e8cbe..eb5d794 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -141,7 +141,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { logger.trace("notification {} start query", notification.getUuid()); } - logger.info("Notification {} started processing", notification.getUuid()); + if(logger.isTraceEnabled()) { + logger.trace("Notification {} started processing", notification.getUuid()); + } @@ -366,7 +368,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { notification.setProcessingFinished(System.currentTimeMillis()); notification.setDeviceProcessedCount(deviceCount.get()); em.update(notification); - logger.info("Notification {} finished processing {} device(s)", notification.getUuid(), deviceCount.get()); + if(logger.isTraceEnabled()) { + logger.trace("Notification {} finished processing {} device(s)", notification.getUuid(), deviceCount.get()); + } } catch (Exception e) { logger.error("Unable to set processing finished timestamp for notification");