Re-work caching of push notification queue managers such that producing and 
consuming leverage the same cache.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d16f4c17
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d16f4c17
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d16f4c17

Branch: refs/heads/asf-site
Commit: d16f4c17c7b4eda01a8eddf2020139ec3898521d
Parents: c07cdb5
Author: Michael Russo <mru...@apigee.com>
Authored: Sat Apr 30 18:14:40 2016 +0800
Committer: Michael Russo <mru...@apigee.com>
Committed: Sat Apr 30 18:14:40 2016 +0800

----------------------------------------------------------------------
 .../main/resources/usergrid-default.properties  |  10 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  18 ++-
 .../queue/impl/QueueManagerFactoryImpl.java     |  27 ++--
 .../ApplicationQueueManagerCache.java           | 143 +++++++++++++++++++
 .../notifications/NotificationsService.java     |   6 +-
 .../services/notifications/QueueListener.java   |  58 ++------
 .../impl/ApplicationQueueManagerImpl.java       |   8 +-
 7 files changed, 197 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties 
b/stack/config/src/main/resources/usergrid-default.properties
index 5cd7c7a..4f57cdd 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -455,12 +455,20 @@ usergrid.scheduler.job.queueName=/jobs
 
 # Set the number of queue consumers to read from the in-region push 
notification queue.
 #
-usergrid.push.worker_count=8
+usergrid.push.worker_count=2
 
 # Set the sleep time between queue polling ( in milliseconds)
 #
 usergrid.push.sleep=100
 
+# This setting determines the inmemory cache TTL (in minutes) for push 
notifications queue managers.
+#
+usergrid.push.queuemanager.cache.time-to-live=10
+
+# This setting determines the inmemory cache size (# elements) for push 
notifications queue managers.
+#
+usergrid.push.queuemanager.cache.size=200
+
 
 
 ###############################  Usergrid Central SSO  
#############################

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index b26ee46..0853adb 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -303,9 +303,6 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
             logger.trace("Finished compacting {} shards and moved {} edges", 
sourceShards, totalEdgeCount);
         }
 
-        logger.info("Finished compacting {} shards and moved {} edges", 
sourceShards, totalEdgeCount);
-
-
         resultBuilder.withCopiedEdges( totalEdgeCount ).withSourceShards( 
sourceShards ).withTargetShard( targetShard );
 
         /**
@@ -351,8 +348,9 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
             Shard compactedShard = new Shard( targetShard.getShardIndex(), 
timeService.getCurrentTime(), true );
             compactedShard.setShardEnd(targetShard.getShardEnd());
 
-            logger.info( "Shard has been fully compacted.  Marking shard {} as 
compacted in Cassandra", compactedShard );
-
+            if(logger.isTraceEnabled()) {
+                logger.trace("Shard has been fully compacted.  Marking shard 
{} as compacted in Cassandra", compactedShard);
+            }
 
             final MutationBatch updateMark = 
edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
             try {
@@ -402,7 +400,7 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
         }
         catch ( RejectedExecutionException ree ) {
 
-            //ignore, if this happens we don't care, we're saturated, we can 
check later
+            // ignore, if this happens we don't care, we're saturated, we can 
check later
             logger.info( "Rejected audit for shard of scope {} edge, meta {} 
and group {}", scope, edgeMeta, group );
 
             return Futures.immediateFuture( AuditResult.NOT_CHECKED );
@@ -503,8 +501,10 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
                  */
                 try {
                     CompactionResult result = compact( scope, edgeMeta, group 
);
-                    logger.info( "Compaction result for compaction of scope {} 
with edge meta data of {} and shard group {} is {}",
-                            scope, edgeMeta, group, result );
+                    if(logger.isTraceEnabled()) {
+                        logger.trace("Compaction result for compaction of 
scope {} with edge meta data of {} and shard group {} is {}",
+                            scope, edgeMeta, group, result);
+                    }
                 }
                 finally {
                     shardCompactionTaskTracker.complete( scope, edgeMeta, 
group );
@@ -535,8 +535,6 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
                                      ShardEntryGroup group ) {
             final Long hash = doHash( scope, edgeMeta, group ).hash().asLong();
             final Boolean returned = runningTasks.putIfAbsent( hash, TRUE );
-            //logger.info("hash components are app: {}, edgeMeta: {}, group: 
{}", scope.getApplication(), edgeMeta, group);
-            //logger.info("checking hash value of: {}, already started: {}", 
hash, returned );
 
             /**
              * Someone already put the value

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
index de9cac5..93b2fb2 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
@@ -1,21 +1,18 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.usergrid.persistence.queue.impl;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java
 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java
new file mode 100644
index 0000000..555e495
--- /dev/null
+++ 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import com.google.common.cache.*;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import 
org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+
+@Singleton
+public class ApplicationQueueManagerCache{
+
+    private static final Logger logger = LoggerFactory.getLogger( 
ApplicationQueueManagerCache.class );
+
+
+    private final Cache<UUID, ApplicationQueueManager> cache;
+
+    private static final String CACHE_TTL_PROP = 
"usergrid.push.queuemanager.cache.time-to-live";
+    private static final String CACHE_MAX_SIZE_PROP = 
"usergrid.push.queuemanager.cache.size";
+
+    public ApplicationQueueManagerCache(){
+
+        // set a smaller ttl
+        long ttl = 10;
+        int configuredMaxSize;
+
+        try{
+            ttl = Integer.parseInt(System.getProperty(CACHE_TTL_PROP));
+        } catch (NumberFormatException e){
+            // already defaulted to 1 above
+        }
+
+        try{
+            configuredMaxSize = 
Integer.parseInt(System.getProperty(CACHE_MAX_SIZE_PROP));
+        } catch (NumberFormatException e){
+            configuredMaxSize = 200;
+        }
+
+        this.cache = CacheBuilder.newBuilder()
+            .maximumSize(Math.max(1000,configuredMaxSize))
+            .expireAfterWrite(ttl, TimeUnit.MINUTES)
+            .removalListener(new RemovalListener<UUID, 
ApplicationQueueManager>() {
+                @Override
+                public void onRemoval(
+                    RemovalNotification<UUID, ApplicationQueueManager> 
queueManagerNotifiication) {
+                    try {
+                        if ( queueManagerNotifiication.getValue() != null) {
+                            queueManagerNotifiication.getValue().stop();
+                        }
+                    } catch (Exception ie) {
+                        logger.error("Failed to shutdown push queue manager 
from cache", ie.getMessage());
+                    }
+                }
+            }).build();
+
+    }
+
+    public void put(UUID key, ApplicationQueueManager value){
+
+        cache.put(key, value);
+    }
+
+    public ConcurrentMap<UUID, ApplicationQueueManager> asMap(){
+
+        return cache.asMap();
+    }
+
+    public ApplicationQueueManager get(UUID key){
+        return cache.getIfPresent(key);
+    }
+
+    public void invalidate(UUID key){
+        cache.invalidate(key);
+    }
+
+    public void invalidateAll(){
+        cache.invalidateAll();
+    }
+
+
+    public ApplicationQueueManager getApplicationQueueManager( final 
EntityManager entityManager,
+                                                               final 
QueueManager queueManager,
+                                                               final 
JobScheduler jobScheduler,
+                                                               final 
MetricsFactory metricsService,
+                                                               final 
Properties properties ) {
+
+
+        ApplicationQueueManager manager = 
cache.getIfPresent(entityManager.getApplicationId());
+
+        if(manager != null){
+            if(logger.isTraceEnabled()){
+                logger.trace("Returning push queue manager from cache for 
application: {}", entityManager.getApplicationId());
+            }
+            return manager;
+
+        }else {
+            if(logger.isTraceEnabled()) {
+                logger.trace("Push queue manager not found in cache, loading 
for application: {}", entityManager.getApplicationId());
+            }
+            manager = new ApplicationQueueManagerImpl(
+                jobScheduler,
+                entityManager,
+                queueManager,
+                metricsService,
+                properties
+            );
+
+            cache.put(entityManager.getApplicationId(), manager);
+
+            return manager;
+
+
+        }
+
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 65425d7..907638e 100644
--- 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -77,6 +77,7 @@ public class NotificationsService extends 
AbstractCollectionService {
     private ServiceManagerFactory smf;
     private EntityManagerFactory emf;
     private QueueManagerFactory queueManagerFactory;
+    private ApplicationQueueManagerCache applicationQueueManagerCache;
 
     public NotificationsService() {
         if (logger.isTraceEnabled()) {
@@ -99,7 +100,10 @@ public class NotificationsService extends 
AbstractCollectionService {
         QueueScope queueScope = new QueueScopeImpl( name, 
QueueScope.RegionImplementation.LOCAL);
         queueManagerFactory = getApplicationContext().getBean( Injector.class 
).getInstance(QueueManagerFactory.class);
         QueueManager queueManager = 
queueManagerFactory.getQueueManager(queueScope);
-        notificationQueueManager = new 
ApplicationQueueManagerImpl(jobScheduler,em,queueManager,metricsService,props);
+        applicationQueueManagerCache = 
getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class);
+        notificationQueueManager = applicationQueueManagerCache
+            .getApplicationQueueManager(em,queueManager, jobScheduler, 
metricsService ,props);
+
         gracePeriod = JobScheduler.SCHEDULER_GRACE_PERIOD;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 55d1491..478d5ed 100644
--- 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -57,6 +57,7 @@ public class QueueListener  {
     private ServiceManagerFactory smf;
 
     private EntityManagerFactory emf;
+    private ApplicationQueueManagerCache applicationQueueManagerCache;
 
 
     private Properties properties;
@@ -79,6 +80,8 @@ public class QueueListener  {
         this.emf = emf;
         this.metricsService = smf.getApplicationContext().getBean( 
Injector.class ).getInstance(MetricsFactory.class);
         this.properties = props;
+        this.applicationQueueManagerCache = 
smf.getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class);
+
     }
 
     /**
@@ -86,8 +89,6 @@ public class QueueListener  {
      */
     public void start(){
         //TODO refactor this into a central component that will start/stop 
services
-//        boolean shouldRun = new 
Boolean(properties.getProperty("usergrid.notifications.listener.run", "false"));
-
 
             if (logger.isDebugEnabled()) {
                 logger.debug("QueueListener: starting.");
@@ -166,9 +167,6 @@ public class QueueListener  {
         // run until there are no more active jobs
         final AtomicLong runCount = new AtomicLong(0);
 
-        //cache to retrieve push manager, cached per notifier, so many 
notifications will get same push manager
-        LoadingCache<UUID, ApplicationQueueManager> queueManagerMap = 
getQueueManagerCache(queueManager);
-
         while ( true ) {
 
                 Timer.Context timerContext = timer.time();
@@ -207,7 +205,16 @@ public class QueueListener  {
                                 //send each set of app ids together
                                 for (Map.Entry<UUID, List<QueueMessage>> entry 
: messageMap.entrySet()) {
                                     UUID applicationId = entry.getKey();
-                                    ApplicationQueueManager manager = 
queueManagerMap.get(applicationId);
+
+                                    ApplicationQueueManager manager = 
applicationQueueManagerCache
+                                        .getApplicationQueueManager(
+                                            
emf.getEntityManager(applicationId),
+                                            queueManager,
+                                            new 
JobScheduler(smf.getServiceManager(applicationId), 
emf.getEntityManager(applicationId)),
+                                            metricsService,
+                                            properties
+                                        );
+
                                     if (logger.isTraceEnabled()) {
                                         logger.trace("send batch for app {} of 
{} messages", entry.getKey(), entry.getValue().size());
                                     }
@@ -238,7 +245,7 @@ public class QueueListener  {
                                 }
 
                                 if(runCount.incrementAndGet() % 
consecutiveCallsToRemoveDevices == 0){
-                                    for(ApplicationQueueManager 
applicationQueueManager : queueManagerMap.asMap().values()){
+                                    for(ApplicationQueueManager 
applicationQueueManager : applicationQueueManagerCache.asMap().values()){
                                         try {
                                             
applicationQueueManager.asyncCheckForInactiveDevices();
                                         }catch (Exception 
inactiveDeviceException){
@@ -280,43 +287,6 @@ public class QueueListener  {
         }
     }
 
-    private LoadingCache<UUID, ApplicationQueueManager> 
getQueueManagerCache(final QueueManager queueManager) {
-        return CacheBuilder
-                    .newBuilder()
-                    .expireAfterAccess(10, TimeUnit.MINUTES)
-                    .removalListener(new RemovalListener<UUID, 
ApplicationQueueManager>() {
-                        @Override
-                        public void onRemoval(
-                                RemovalNotification<UUID, 
ApplicationQueueManager> queueManagerNotifiication) {
-                            try {
-                                queueManagerNotifiication.getValue().stop();
-                            } catch (Exception ie) {
-                                logger.error("Failed to shutdown from cache", 
ie);
-                            }
-                        }
-                    }).build(new CacheLoader<UUID, ApplicationQueueManager>() {
-                         @Override
-                         public ApplicationQueueManager load(final UUID 
applicationId) {
-                             try {
-                                 EntityManager entityManager = 
emf.getEntityManager(applicationId);
-                                 ServiceManager serviceManager = 
smf.getServiceManager(applicationId);
-
-                                 ApplicationQueueManagerImpl manager = new 
ApplicationQueueManagerImpl(
-                                         new JobScheduler(serviceManager, 
entityManager),
-                                         entityManager,
-                                         queueManager,
-                                         metricsService,
-                                         properties
-                                 );
-                                 return manager;
-                             } catch (Exception e) {
-                                 logger.error("Could not instantiate queue 
manager", e);
-                                 return null;
-                             }
-                         }
-                     });
-    }
-
     public void stop(){
         if (logger.isDebugEnabled()) {
             logger.debug("stop processes");

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 80e8cbe..eb5d794 100644
--- 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -141,7 +141,9 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
                 logger.trace("notification {} start query", 
notification.getUuid());
             }
 
-            logger.info("Notification {} started processing", 
notification.getUuid());
+            if(logger.isTraceEnabled()) {
+                logger.trace("Notification {} started processing", 
notification.getUuid());
+            }
 
 
 
@@ -366,7 +368,9 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
                         
notification.setProcessingFinished(System.currentTimeMillis());
                         
notification.setDeviceProcessedCount(deviceCount.get());
                         em.update(notification);
-                        logger.info("Notification {} finished processing {} 
device(s)", notification.getUuid(), deviceCount.get());
+                        if(logger.isTraceEnabled()) {
+                            logger.trace("Notification {} finished processing 
{} device(s)", notification.getUuid(), deviceCount.get());
+                        }
 
                     } catch (Exception e) {
                         logger.error("Unable to set processing finished 
timestamp for notification");

Reply via email to