Remove explicit init message and option to turn off in-memory queue cache.

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/522a5515
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/522a5515
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/522a5515

Branch: refs/heads/usergrid-1318-queue
Commit: 522a5515baf016bafaf6ee5e819dfa4437fc1412
Parents: 521047c
Author: Dave Johnson <snoopd...@apache.org>
Authored: Thu Nov 3 17:56:03 2016 -0400
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Thu Nov 3 17:56:03 2016 -0400

----------------------------------------------------------------------
 .../qakka/core/impl/QueueManagerImpl.java       |   4 +-
 .../distributed/DistributedQueueService.java    |   4 -
 .../qakka/distributed/actors/QueueActor.java    |  93 ++------------
 .../distributed/actors/QueueActorHelper.java    | 126 ++++++++++++++++++-
 .../distributed/actors/QueueActorRouter.java    |  76 ++++++++++-
 .../distributed/actors/QueueRefresher.java      |  95 +-------------
 .../impl/DistributedQueueServiceImpl.java       |  55 +++-----
 7 files changed, 227 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/522a5515/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
index d51fe2d..88d307c 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
@@ -96,7 +96,7 @@ public class QueueManagerImpl implements QueueManager {
         messageCounterSerialization.incrementCounter( queue.getName(), 
DatabaseQueueMessage.Type.DEFAULT, 0L );
         messageCounterSerialization.incrementCounter( queue.getName(), 
DatabaseQueueMessage.Type.INFLIGHT, 0L );
 
-        distributedQueueService.initQueue( queue.getName() );
+        //distributedQueueService.initQueue( queue.getName() );
         distributedQueueService.refreshQueue( queue.getName() );
     }
 
@@ -105,7 +105,7 @@ public class QueueManagerImpl implements QueueManager {
 
         queueSerialization.writeQueue(queue.toDatabaseQueue());
 
-        distributedQueueService.initQueue( queue.getName() );
+        //distributedQueueService.initQueue( queue.getName() );
         distributedQueueService.refreshQueue( queue.getName() );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/522a5515/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
index b11dcff..984cea2 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
@@ -34,8 +34,6 @@ public interface DistributedQueueService {
 
     void init();
 
-    void initQueue(String queueName);
-
     void refresh();
 
     void shutdown();
@@ -57,6 +55,4 @@ public interface DistributedQueueService {
     Status ackMessage(String queueName, UUID messageId);
 
     Status requeueMessage(String queueName, UUID messageId);
-
-    Status clearMessages(String queueName);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/522a5515/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 248f9cd..92e8607 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -20,121 +20,52 @@
 package org.apache.usergrid.persistence.qakka.distributed.actors;
 
 import akka.actor.ActorRef;
-import akka.actor.Cancellable;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
 import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
-import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.MetricsService;
-import org.apache.usergrid.persistence.qakka.QakkaFig;
-import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
 import 
org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
 import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
 
-import java.text.DecimalFormat;
 import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 
 public class QueueActor extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( 
QueueActor.class );
 
-    private final QakkaFig         qakkaFig;
-    private final InMemoryQueue    inMemoryQueue;
     private final QueueActorHelper queueActorHelper;
     private final MetricsService   metricsService;
-    private final MessageCounterSerialization messageCounterSerialization;
 
-    private final Map<String, Cancellable> refreshSchedulersByQueueName = new 
HashMap<>();
-    private final Map<String, Cancellable> timeoutSchedulersByQueueName = new 
HashMap<>();
-    private final Map<String, Cancellable> 
shardAllocationSchedulersByQueueName = new HashMap<>();
-
-    private final Map<String, ActorRef> queueReadersByQueueName    = new 
HashMap<>();
+    //private final Map<String, ActorRef> queueReadersByQueueName    = new 
HashMap<>();
     private final Map<String, ActorRef> queueTimeoutersByQueueName = new 
HashMap<>();
     private final Map<String, ActorRef> shardAllocatorsByQueueName = new 
HashMap<>();
 
-    private final Set<String> queuesSeen = new HashSet<>();
-
 
     @Inject
     public QueueActor(
-        QakkaFig         qakkaFig,
-        InMemoryQueue    inMemoryQueue,
         QueueActorHelper queueActorHelper,
-        MetricsService   metricsService,
-        MessageCounterSerialization messageCounterSerialization
+        MetricsService   metricsService
     ) {
-        this.qakkaFig = qakkaFig;
-        this.inMemoryQueue = inMemoryQueue;
         this.queueActorHelper = queueActorHelper;
         this.metricsService = metricsService;
-        this.messageCounterSerialization = messageCounterSerialization;
     }
 
 
     @Override
     public void onReceive(Object message) {
 
-        if ( message instanceof QueueInitRequest) {
-            QueueInitRequest request = (QueueInitRequest)message;
-
-            queuesSeen.add( request.getQueueName() );
-
-            if ( refreshSchedulersByQueueName.get( request.getQueueName() ) == 
null ) {
-                Cancellable scheduler = 
getContext().system().scheduler().schedule(
-                    Duration.create( 0, TimeUnit.MILLISECONDS),
-                    Duration.create( qakkaFig.getQueueRefreshMilliseconds(), 
TimeUnit.MILLISECONDS),
-                    self(),
-                    new QueueRefreshRequest( request.getQueueName(), false ),
-                    getContext().dispatcher(),
-                    getSelf());
-                refreshSchedulersByQueueName.put( request.getQueueName(), 
scheduler );
-                logger.debug("Created refresher for queue {}", 
request.getQueueName() );
-            }
-
-            if ( timeoutSchedulersByQueueName.get( request.getQueueName() ) == 
null ) {
-                Cancellable scheduler = 
getContext().system().scheduler().schedule(
-                        Duration.create( 0, TimeUnit.MILLISECONDS),
-                        Duration.create( qakkaFig.getQueueTimeoutSeconds()/2, 
TimeUnit.SECONDS),
-                        self(),
-                        new QueueTimeoutRequest( request.getQueueName() ),
-                        getContext().dispatcher(),
-                        getSelf());
-                timeoutSchedulersByQueueName.put( request.getQueueName(), 
scheduler );
-                logger.debug("Created timeouter for queue {}", 
request.getQueueName() );
-            }
-
-            if ( shardAllocationSchedulersByQueueName.get( 
request.getQueueName() ) == null ) {
-                Cancellable scheduler = 
getContext().system().scheduler().schedule(
-                        Duration.create( 0, TimeUnit.MILLISECONDS),
-                        Duration.create( 
qakkaFig.getShardAllocationCheckFrequencyMillis(), TimeUnit.MILLISECONDS),
-                        self(),
-                        new ShardCheckRequest( request.getQueueName() ),
-                        getContext().dispatcher(),
-                        getSelf());
-                shardAllocationSchedulersByQueueName.put( 
request.getQueueName(), scheduler );
-                logger.debug("Created shard allocater for queue {}", 
request.getQueueName() );
-            }
-
-
-        } else if ( message instanceof QueueRefreshRequest ) {
+        if ( message instanceof QueueRefreshRequest ) {
             QueueRefreshRequest request = (QueueRefreshRequest)message;
-            queuesSeen.add( request.getQueueName() );
-
-//            // NOT asynchronous
-//            queueActorHelper.queueRefresh( request.getQueueName() );
 
-            if ( queueReadersByQueueName.get( request.getQueueName() ) == null 
) {
+            // NOT asynchronous because we want this to happen locally in this 
JVM
+            queueActorHelper.queueRefresh( request.getQueueName() );
 
+            /* if ( queueReadersByQueueName.get( request.getQueueName() ) == 
null ) {
                 if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( 
request.getQueueName()) == null ) {
                     ActorRef readerRef = getContext().actorOf(
                         Props.create( GuiceActorProducer.class, 
QueueRefresher.class ),
@@ -142,16 +73,13 @@ public class QueueActor extends UntypedActor {
                     queueReadersByQueueName.put( request.getQueueName(), 
readerRef );
                 }
             }
-
             // hand-off to queue's reader
-            queueReadersByQueueName.get( request.getQueueName() ).tell( 
request, self() );
+            queueReadersByQueueName.get( request.getQueueName() ).tell( 
request, self() ); */
 
 
         } else if ( message instanceof QueueTimeoutRequest ) {
             QueueTimeoutRequest request = (QueueTimeoutRequest)message;
 
-            queuesSeen.add( request.getQueueName() );
-
             if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == 
null ) {
                 ActorRef readerRef = getContext().actorOf(
                     Props.create( GuiceActorProducer.class, 
QueueTimeouter.class),
@@ -166,8 +94,6 @@ public class QueueActor extends UntypedActor {
         } else if ( message instanceof ShardCheckRequest ) {
             ShardCheckRequest request = (ShardCheckRequest)message;
 
-            queuesSeen.add( request.getQueueName() );
-
             if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == 
null ) {
                 ActorRef readerRef = getContext().actorOf(
                     Props.create( GuiceActorProducer.class, 
ShardAllocator.class),
@@ -186,12 +112,11 @@ public class QueueActor extends UntypedActor {
             String queueName = queueGetRequest.getQueueName();
             int numRequested = queueGetRequest.getNumRequested();
 
-            queuesSeen.add( queueName );
-
             Timer.Context timer = metricsService.getMetricRegistry().timer( 
MetricsService.GET_TIME_GET ).time();
             try {
 
                 Collection<DatabaseQueueMessage> messages = 
queueActorHelper.getMessages( queueName, numRequested);
+                logger.trace("Returning queue {} messages {}", queueName, 
messages.size() );
 
                 getSender().tell( new QueueGetResponse(
                         DistributedQueueService.Status.SUCCESS, messages, 
queueName ), getSender() );
@@ -206,4 +131,6 @@ public class QueueActor extends UntypedActor {
 
     }
 
+
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/522a5515/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index 6382661..4696a67 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -21,6 +21,7 @@ package 
org.apache.usergrid.persistence.qakka.distributed.actors;
 
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
@@ -37,12 +38,10 @@ import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterato
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
 
 
+@Singleton
 public class QueueActorHelper {
     private static final Logger logger = LoggerFactory.getLogger( 
QueueActorHelper.class );
 
@@ -54,6 +53,8 @@ public class QueueActorHelper {
     private final MetricsService            metricsService;
     private final CassandraClient           cassandraClient;
 
+    Map<String, Long> startingShards = new HashMap<>();
+
 
     @Inject
     public QueueActorHelper(
@@ -97,6 +98,16 @@ public class QueueActorHelper {
 
     Collection<DatabaseQueueMessage> getMessages(String queueName, int 
numRequested ) {
 
+        if ( qakkaFig.getInMemoryCache() ) {
+            return getMessagesFromMemory( queueName, numRequested );
+        } else {
+            return getMessagesFromStorage( queueName, numRequested );
+        }
+    }
+
+
+    Collection<DatabaseQueueMessage> getMessagesFromMemory(String queueName, 
int numRequested ) {
+
         Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>();
 
         while (queueMessages.size() < numRequested) {
@@ -117,7 +128,51 @@ public class QueueActorHelper {
 
         //logger.debug("{} returning {} for queue {}", this, 
queueMessages.size(), queueName);
         return queueMessages;
+    }
+
+
+    Collection<DatabaseQueueMessage> getMessagesFromStorage(String queueName, 
int numRequested ) {
+
+        Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>();
+
+        final Optional shardIdOptional;
+        final String shardKey =
+            createShardKey( queueName, Shard.Type.DEFAULT, 
actorSystemFig.getRegionLocal() );
+
+        Long shardId = startingShards.get( shardKey );
+        if ( shardId != null ) {
+            shardIdOptional = Optional.of( shardId );
+        } else {
+            shardIdOptional = Optional.empty();
+        }
+
+        String region = actorSystemFig.getRegionLocal();
+
+        ShardIterator shardIterator = new ShardIterator(
+            cassandraClient, queueName, region, Shard.Type.DEFAULT, 
shardIdOptional );
+
+        MultiShardMessageIterator multiShardIterator = new 
MultiShardMessageIterator(
+            cassandraClient, queueName, region, 
DatabaseQueueMessage.Type.DEFAULT, shardIterator, null);
+
+        int count = 0;
+
+        while ( multiShardIterator.hasNext() && count < numRequested ) {
+            DatabaseQueueMessage queueMessage = multiShardIterator.next();
 
+            if ( queueMessage != null && putInflight( queueMessage ) ) {
+                queueMessages.add( queueMessage );
+                count++;
+            }
+        }
+
+        Shard currentShard = multiShardIterator.getCurrentShard();
+        if ( currentShard != null ) {
+            shardId = currentShard.getShardId();
+            startingShards.put( shardKey, shardId );
+        }
+
+        //logger.debug("{} returning {} for queue {}", this, 
queueMessages.size(), queueName);
+        return queueMessages;
     }
 
 
@@ -208,4 +263,67 @@ public class QueueActorHelper {
             return DistributedQueueService.Status.ERROR;
         }
     }
+
+
+    void queueRefresh( String queueName ) {
+
+        Timer.Context timer = metricsService.getMetricRegistry().timer( 
MetricsService.REFRESH_TIME).time();
+
+        try {
+
+            if (inMemoryQueue.size( queueName ) < 
qakkaFig.getQueueInMemorySize()) {
+
+                final Optional shardIdOptional;
+                final String shardKey =
+                    createShardKey( queueName, Shard.Type.DEFAULT, 
actorSystemFig.getRegionLocal() );
+                Long shardId = startingShards.get( shardKey );
+
+                if ( shardId != null ) {
+                    shardIdOptional = Optional.of( shardId );
+                } else {
+                    shardIdOptional = Optional.empty();
+                }
+
+                ShardIterator shardIterator = new ShardIterator(
+                    cassandraClient, queueName, 
actorSystemFig.getRegionLocal(),
+                    Shard.Type.DEFAULT, shardIdOptional );
+
+                UUID since = inMemoryQueue.getNewest( queueName );
+
+                String region = actorSystemFig.getRegionLocal();
+
+                MultiShardMessageIterator multiShardIterator = new 
MultiShardMessageIterator(
+                    cassandraClient, queueName, region, 
DatabaseQueueMessage.Type.DEFAULT,
+                    shardIterator, since);
+
+
+                int need = qakkaFig.getQueueInMemorySize() - 
inMemoryQueue.size( queueName );
+                int count = 0;
+
+                while ( multiShardIterator.hasNext() && count < need ) {
+                    DatabaseQueueMessage queueMessage = 
multiShardIterator.next();
+                    inMemoryQueue.add( queueName, queueMessage );
+                    count++;
+                }
+
+                Shard currentShard = multiShardIterator.getCurrentShard();
+                if ( currentShard != null ) {
+                    shardId = currentShard.getShardId();
+                    startingShards.put( shardKey, shardId );
+                }
+
+                logger.trace("Refreshed queue {} region {} shard {} since {} 
found {}",
+                    queueName, region, shardId, since, count );
+            }
+
+        } finally {
+            timer.close();
+        }
+
+    }
+
+    private String createShardKey(String queueName, Shard.Type type, String 
region ) {
+        return queueName + "_" + type + region;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/522a5515/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index b5b9c30..71cf332 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.persistence.qakka.distributed.actors;
 
 import akka.actor.ActorRef;
+import akka.actor.Cancellable;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.routing.ConsistentHashingRouter;
@@ -27,23 +28,43 @@ import akka.routing.FromConfig;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
 import 
org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer;
 import org.apache.usergrid.persistence.qakka.distributed.messages.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 
 /**
  * Use consistent hashing to route messages to QueueActors
  */
 public class QueueActorRouter extends UntypedActor {
+    private static final Logger logger = LoggerFactory.getLogger( 
QueueActorRouter.class );
 
     private final ActorRef routerRef;
     private final QueueActorRouterProducer queueActorRouterProducer;
 
+    private final QakkaFig qakkaFig;
+    private final Set<String> queuesSeen = new HashSet<>();
+
+    private final Map<String, Cancellable> refreshSchedulersByQueueName = new 
HashMap<>();
+    private final Map<String, Cancellable> timeoutSchedulersByQueueName = new 
HashMap<>();
+    private final Map<String, Cancellable> 
shardAllocationSchedulersByQueueName = new HashMap<>();
+
 
     @Inject
-    public QueueActorRouter( QueueActorRouterProducer queueActorRouterProducer 
) {
+    public QueueActorRouter( QueueActorRouterProducer 
queueActorRouterProducer, QakkaFig qakkaFig ) {
 
         this.queueActorRouterProducer = queueActorRouterProducer;
+        this.qakkaFig = qakkaFig;
 
         this.routerRef = getContext().actorOf( FromConfig.getInstance().props(
             Props.create( GuiceActorProducer.class, QueueActor.class)
@@ -56,6 +77,10 @@ public class QueueActorRouter extends UntypedActor {
         if ( queueActorRouterProducer.getMessageTypes().contains( 
message.getClass() ) ) {
             QakkaMessage qakkaMessage = (QakkaMessage) message;
 
+            if ( qakkaMessage.getQueueName() != null ) {
+                initIfNeeded( qakkaMessage.getQueueName() );
+            }
+
             ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
                     new ConsistentHashingRouter.ConsistentHashableEnvelope( 
message, qakkaMessage.getQueueName() );
             routerRef.tell( envelope, getSender() );
@@ -64,4 +89,53 @@ public class QueueActorRouter extends UntypedActor {
             unhandled(message);
         }
     }
+
+
+    /**
+     * Create scheduled refresh, timeout and shard-allocation tasks just in 
time.
+     */
+    private void initIfNeeded( String queueName ) {
+
+        if (!queuesSeen.contains( queueName )) {
+
+            queuesSeen.add( queueName );
+
+            if ( qakkaFig.getInMemoryCache() && 
refreshSchedulersByQueueName.get( queueName ) == null) {
+                Cancellable scheduler = 
getContext().system().scheduler().schedule(
+                    Duration.create( 0, TimeUnit.MILLISECONDS ),
+                    Duration.create( qakkaFig.getQueueRefreshMilliseconds(), 
TimeUnit.MILLISECONDS ),
+                    self(),
+                    new QueueRefreshRequest( queueName, false ),
+                    getContext().dispatcher(),
+                    getSelf() );
+                refreshSchedulersByQueueName.put( queueName, scheduler );
+                logger.debug( "Created refresher for queue {}", queueName );
+            }
+
+            if ( timeoutSchedulersByQueueName.get( queueName ) == null) {
+                Cancellable scheduler = 
getContext().system().scheduler().schedule(
+                    Duration.create( 0, TimeUnit.MILLISECONDS ),
+                    Duration.create( qakkaFig.getQueueTimeoutSeconds() / 2, 
TimeUnit.SECONDS ),
+                    self(),
+                    new QueueTimeoutRequest( queueName ),
+                    getContext().dispatcher(),
+                    getSelf() );
+                timeoutSchedulersByQueueName.put( queueName, scheduler );
+                logger.debug( "Created timeouter for queue {}", queueName );
+            }
+
+            if ( shardAllocationSchedulersByQueueName.get( queueName ) == 
null) {
+                Cancellable scheduler = 
getContext().system().scheduler().schedule(
+                    Duration.create( 0, TimeUnit.MILLISECONDS ),
+                    Duration.create( 
qakkaFig.getShardAllocationCheckFrequencyMillis(), TimeUnit.MILLISECONDS ),
+                    self(),
+                    new ShardCheckRequest( queueName ),
+                    getContext().dispatcher(),
+                    getSelf() );
+                shardAllocationSchedulersByQueueName.put( queueName, scheduler 
);
+                logger.debug( "Created shard allocater for queue {}", 
queueName );
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/522a5515/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index 509ccd9..86f94f1 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -20,52 +20,21 @@
 package org.apache.usergrid.persistence.qakka.distributed.actors;
 
 import akka.actor.UntypedActor;
-import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
-import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
-import org.apache.usergrid.persistence.qakka.MetricsService;
-import org.apache.usergrid.persistence.qakka.QakkaFig;
-import org.apache.usergrid.persistence.qakka.core.CassandraClient;
-import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
 import 
org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
-import 
org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
-import 
org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
-import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
-import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
-
 
 public class QueueRefresher extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( 
QueueRefresher.class );
 
-    private final ActorSystemFig  actorSystemFig;
-    private final InMemoryQueue   inMemoryQueue;
-    private final QakkaFig        qakkaFig;
-    private final MetricsService  metricsService;
-    private final CassandraClient cassandraClient;
+    private final QueueActorHelper queueActorHelper;
 
 
     @Inject
-    public QueueRefresher(
-        ActorSystemFig  actorSystemFig,
-        InMemoryQueue   inMemoryQueue,
-        QakkaFig        qakkaFig,
-        MetricsService  metricsService,
-        CassandraClient cassandraClient
-    ) {
-        this.actorSystemFig  = actorSystemFig;
-        this.inMemoryQueue   = inMemoryQueue;
-        this.qakkaFig        = qakkaFig;
-        this.metricsService  = metricsService;
-        this.cassandraClient = cassandraClient;
+    public QueueRefresher( QueueActorHelper queueActorHelper ) {
+        this.queueActorHelper = queueActorHelper;
     }
 
 
@@ -83,64 +52,8 @@ public class QueueRefresher extends UntypedActor {
         }
     }
 
-    Map<String, Long> startingShards = new HashMap<>();
-
 
     void queueRefresh( String queueName ) {
-
-        Timer.Context timer = metricsService.getMetricRegistry().timer( 
MetricsService.REFRESH_TIME).time();
-
-        try {
-
-            if (inMemoryQueue.size( queueName ) < 
qakkaFig.getQueueInMemorySize()) {
-
-                final Optional shardIdOptional;
-                final String shardKey =
-                    createShardKey( queueName, Shard.Type.DEFAULT, 
actorSystemFig.getRegionLocal() );
-                Long shardId = startingShards.get( shardKey );
-
-                if ( shardId != null ) {
-                    shardIdOptional = Optional.of( shardId );
-                } else {
-                    shardIdOptional = Optional.empty();
-                }
-
-                ShardIterator shardIterator = new ShardIterator(
-                    cassandraClient, queueName, 
actorSystemFig.getRegionLocal(),
-                    Shard.Type.DEFAULT, shardIdOptional );
-
-                UUID since = inMemoryQueue.getNewest( queueName );
-
-                String region = actorSystemFig.getRegionLocal();
-
-                MultiShardMessageIterator multiShardIterator = new 
MultiShardMessageIterator(
-                    cassandraClient, queueName, region, 
DatabaseQueueMessage.Type.DEFAULT,
-                    shardIterator, since);
-
-
-                int need = qakkaFig.getQueueInMemorySize() - 
inMemoryQueue.size( queueName );
-                int count = 0;
-
-                while ( multiShardIterator.hasNext() && count < need ) {
-                    DatabaseQueueMessage queueMessage = 
multiShardIterator.next();
-                    inMemoryQueue.add( queueName, queueMessage );
-                    count++;
-                }
-
-                startingShards.put( shardKey, shardId );
-
-                logger.debug("Refreshed queue {} region {} shard {} since {} 
found {}",
-                    queueName, region, shardId, since, count );
-            }
-
-        } finally {
-            timer.close();
-        }
-
+        queueActorHelper.queueRefresh( queueName );
     }
-
-    private String createShardKey(String queueName, Shard.Type type, String 
region ) {
-        return queueName + "_" + type + region;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/522a5515/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 51f6fd3..96e8cab 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -80,22 +80,6 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
     @Override
     public void init() {
 
-        try {
-            List<String> queues = queueManager.getListOfQueues();
-            for (String queueName : queues) {
-                initQueue( queueName );
-            }
-        } catch (InvalidQueryException e) {
-
-            if (e.getMessage().contains( "unconfigured columnfamily" )) {
-                logger.info( "Unable to initialize queues since system is 
bootstrapping.  " +
-                    "Queues will be initialized when created" );
-            } else {
-                throw e;
-            }
-
-        }
-
         StringBuilder logMessage = new StringBuilder();
         logMessage.append( "DistributedQueueServiceImpl initialized with 
config:\n" );
         Method[] methods = qakkaFig.getClass().getMethods();
@@ -114,15 +98,6 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
 
 
     @Override
-    public void initQueue(String queueName) {
-        logger.info("Initializing queue: {}", queueName);
-        QueueInitRequest request = new QueueInitRequest( queueName );
-        ActorRef clientActor = actorSystemManager.getClientActor();
-        clientActor.tell( request, null );
-    }
-
-
-    @Override
     public void refresh() {
         for ( String queueName : queueManager.getListOfQueues() ) {
             refreshQueue( queueName );
@@ -132,10 +107,13 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
 
     @Override
     public void refreshQueue(String queueName) {
-        logger.info("{} Requesting refresh for queue: {}", this, queueName);
-        QueueRefreshRequest request = new QueueRefreshRequest( queueName, 
false );
-        ActorRef clientActor = actorSystemManager.getClientActor();
-        clientActor.tell( request, null );
+        if ( qakkaFig.getInMemoryCache() ) {
+            logger.trace( "{} Requesting refresh for queue: {}", this, 
queueName );
+            QueueRefreshRequest request = new QueueRefreshRequest( queueName, 
false );
+            ActorRef clientActor = actorSystemManager.getClientActor();
+            clientActor.tell( request, null );
+        }
+
     }
 
 
@@ -157,6 +135,8 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
             String queueName, String sourceRegion, String destRegion, UUID 
messageId,
             Long deliveryTime, Long expirationTime ) {
 
+        logger.trace("Sending message to queue {} region {}", queueName, 
destRegion);
+
         Timer.Context timer = metricsService.getMetricRegistry().timer( 
MetricsService.SEND_TIME_TOTAL ).time();
         try {
 
@@ -186,9 +166,11 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
                                 logger.debug("SUCCESS after {} retries", 
retries );
                             }
 
-                            // send refresh-queue-if-empty message
-                            QueueRefreshRequest qrr = new QueueRefreshRequest( 
queueName, false );
-                            clientActor.tell( qrr, null );
+                            if ( qakkaFig.getInMemoryCache() ) {
+                                // send refresh-queue-if-empty message
+                                QueueRefreshRequest qrr = new 
QueueRefreshRequest( queueName, false );
+                                clientActor.tell( qrr, null );
+                            }
 
                             return qarm.getSendStatus();
 
@@ -280,7 +262,6 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
                                 logger.warn( "getNextMessage {} SUCCESS after 
{} tries", queueName, tries );
                             }
                         }
-                        logger.trace("Returning queue {} messages {}", 
queueName, qprm.getQueueMessages().size());
                         return qprm.getQueueMessages();
 
 
@@ -335,14 +316,6 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
     }
 
 
-    @Override
-    public Status clearMessages(String queueName) {
-
-        // TODO: implement clear queue
-        throw new UnsupportedOperationException();
-    }
-
-
     private Status sendMessageToLocalRouters( QakkaMessage message ) {
 
         int maxRetries = 5;

Reply via email to