Repository: usergrid
Updated Branches:
  refs/heads/usergrid-1318-queue 7cc5c1c07 -> 198056227


Implemented queue clear (with test) and added some trace level logging


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/19805622
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/19805622
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/19805622

Branch: refs/heads/usergrid-1318-queue
Commit: 198056227fef2fa896ef2c58aadabb5ada76abfd
Parents: 7cc5c1c
Author: Dave Johnson <snoopd...@apache.org>
Authored: Wed Nov 2 16:26:00 2016 -0400
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Wed Nov 2 16:26:00 2016 -0400

----------------------------------------------------------------------
 .../usergrid/persistence/qakka/QakkaFig.java    |  4 +-
 .../persistence/qakka/api/QueueResource.java    |  5 +
 .../qakka/core/QueueMessageManager.java         |  5 +-
 .../core/impl/QueueMessageManagerImpl.java      | 88 +++++++++---------
 .../distributed/actors/QueueRefresher.java      |  4 +-
 .../impl/DistributedQueueServiceImpl.java       | 22 +----
 .../MultiShardMessageIterator.java              |  2 +-
 .../MessageCounterSerialization.java            |  2 +
 .../QueueMessageSerialization.java              |  9 +-
 .../impl/MessageCounterSerializationImpl.java   | 42 +++++++++
 .../impl/QueueMessageSerializationImpl.java     | 93 ++++++++++++++++++-
 .../queues/impl/QueueSerializationImpl.java     | 10 +-
 .../sharding/ShardCounterSerialization.java     |  6 +-
 .../sharding/ShardSerialization.java            |  2 +
 .../impl/ShardCounterSerializationImpl.java     | 53 ++++++++++-
 .../sharding/impl/ShardSerializationImpl.java   | 23 ++++-
 .../persistence/queue/LegacyQueueManager.java   |  1 -
 .../queue/impl/QakkaQueueManager.java           |  3 +-
 .../qakka/core/QueueMessageManagerTest.java     | 98 ++++++++++++++++++--
 .../distributed/QueueActorServiceTest.java      | 12 ++-
 .../rest/system/QueueSystemResource.java        | 81 +++++++++++++---
 21 files changed, 456 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index e6a8667..87b5c83 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -109,7 +109,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     /** How long to wait for response from queue actor before timing out and 
trying again */
     @Key(QUEUE_GET_TIMEOUT)
-    @Default("1")
+    @Default("3")
     int getGetTimeoutSeconds();
 
     /** Max number of times to retry call to queue writer for queue send 
operation */
@@ -119,7 +119,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     /** How long to wait for response from queue writer before timing out and 
trying again */
     @Key(QUEUE_SEND_TIMEOUT)
-    @Default("2")
+    @Default("5")
     int getSendTimeoutSeconds();
 
     /** Once counter reaches this value, write it to permanent storage */

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
index b609de3..ade5a70 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
@@ -23,11 +23,13 @@ import 
com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
 import com.google.common.base.Preconditions;
 import com.google.common.io.ByteStreams;
 import org.apache.usergrid.persistence.qakka.core.*;
+import org.apache.usergrid.persistence.qakka.exceptions.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import javax.ws.rs.*;
+import javax.ws.rs.NotFoundException;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
@@ -256,6 +258,9 @@ public class QueueResource {
                                    String contentType,
                                    ByteBuffer byteBuffer) {
 
+            if ( queueManager.getQueueConfig( queueName ) == null ) {
+                throw new NotFoundException( "Queue " + queueName + " not 
found" ) ;
+            }
 
             Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName 
), "Queue name is required" );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
index b540fce..252dc22 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
@@ -83,5 +83,8 @@ public interface QueueMessageManager {
      */
     QueueMessage getMessage(String queueName, UUID queueMessageId);
 
-    long getQueueDepth(String queueName);
+    /**
+     * Get queue depth for specified type, messages 'default' (available) or 
'inflight'
+     */
+    long getQueueDepth( String queueName, DatabaseQueueMessage.Type type );
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
index 59e0ce0..59a14bd 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -23,10 +23,7 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.qakka.api.URIStrategy;
-import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
-import org.apache.usergrid.persistence.qakka.core.QueueManager;
-import org.apache.usergrid.persistence.qakka.core.QueueMessage;
-import org.apache.usergrid.persistence.qakka.core.QueueMessageManager;
+import org.apache.usergrid.persistence.qakka.core.*;
 import 
org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
 import org.apache.usergrid.persistence.qakka.exceptions.BadRequestException;
 import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
@@ -35,6 +32,9 @@ import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.Databas
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
 import 
org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,10 +42,7 @@ import org.slf4j.LoggerFactory;
 import java.io.UnsupportedEncodingException;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 
 
 @Singleton
@@ -53,32 +50,37 @@ public class QueueMessageManagerImpl implements 
QueueMessageManager {
 
     private static final Logger logger = LoggerFactory.getLogger( 
QueueMessageManagerImpl.class );
 
-    private final ActorSystemFig            actorSystemFig;
-    private final QueueManager              queueManager;
-    private final QueueMessageSerialization queueMessageSerialization;
-    private final DistributedQueueService   distributedQueueService;
-    private final TransferLogSerialization  transferLogSerialization;
-    private final URIStrategy uriStrategy;
+    private final ActorSystemFig              actorSystemFig;
+    private final QueueManager                queueManager;
+    private final QueueMessageSerialization   queueMessageSerialization;
+    private final DistributedQueueService     distributedQueueService;
+    private final TransferLogSerialization    transferLogSerialization;
+    private final URIStrategy                 uriStrategy;
     private final MessageCounterSerialization messageCounterSerialization;
-
+    private final ShardSerialization          shardSerialization;
+    private final CassandraClient             cassandraClient;
 
     @Inject
     public QueueMessageManagerImpl(
-            ActorSystemFig            actorSystemFig,
-            QueueManager              queueManager,
-            QueueMessageSerialization queueMessageSerialization,
-            DistributedQueueService   distributedQueueService,
-            TransferLogSerialization  transferLogSerialization,
-            URIStrategy               uriStrategy,
-            MessageCounterSerialization messageCounterSerialization ) {
-
-        this.actorSystemFig            = actorSystemFig;
-        this.queueManager              = queueManager;
-        this.queueMessageSerialization = queueMessageSerialization;
-        this.distributedQueueService   = distributedQueueService;
-        this.transferLogSerialization  = transferLogSerialization;
-        this.uriStrategy               = uriStrategy;
+        ActorSystemFig              actorSystemFig,
+        QueueManager                queueManager,
+        QueueMessageSerialization   queueMessageSerialization,
+        DistributedQueueService     distributedQueueService,
+        TransferLogSerialization    transferLogSerialization,
+        URIStrategy                 uriStrategy,
+        MessageCounterSerialization messageCounterSerialization,
+        ShardSerialization          shardSerialization,
+        CassandraClient             cassandraClient ) {
+
+        this.actorSystemFig              = actorSystemFig;
+        this.queueManager                = queueManager;
+        this.queueMessageSerialization   = queueMessageSerialization;
+        this.distributedQueueService     = distributedQueueService;
+        this.transferLogSerialization    = transferLogSerialization;
+        this.uriStrategy                 = uriStrategy;
         this.messageCounterSerialization = messageCounterSerialization;
+        this.shardSerialization          = shardSerialization;
+        this.cassandraClient             = cassandraClient;
     }
 
 
@@ -86,6 +88,10 @@ public class QueueMessageManagerImpl implements 
QueueMessageManager {
     public void sendMessages(String queueName, List<String> destinationRegions,
             Long delayMs, Long expirationSecs, String contentType, ByteBuffer 
messageData) {
 
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue " + queueName + " not found" );
+        }
+
         // TODO: implement delay and expiration
 
 //        Preconditions.checkArgument(delayMs == null || delayMs > 0L,
@@ -93,10 +99,6 @@ public class QueueMessageManagerImpl implements 
QueueMessageManager {
 //        Preconditions.checkArgument(expirationSecs == null || expirationSecs 
> 0L,
 //                "Expiration seconds must be greater than zero");
 
-        if ( queueManager.getQueueConfig( queueName ) == null ) {
-            throw new NotFoundException( "Queue not found: " + queueName );
-        }
-
         // get current time
         Long currentTimeMs = System.currentTimeMillis();
 
@@ -229,16 +231,16 @@ public class QueueMessageManagerImpl implements 
QueueMessageManager {
     }
 
 
-    @Override
-    public void clearMessages(String queueName) {
-
-        if ( queueManager.getQueueConfig( queueName ) == null ) {
-            throw new NotFoundException( "Queue not found: " + queueName );
-        }
+    // TODO: implement delete of message data too
+//    @Override
+//    public void clearMessageData( queueName ) {
+//    }
 
-        // TODO: implement clearMessages
 
-        throw new UnsupportedOperationException( "clearMessages not yet 
implemented" );
+    @Override
+    public void clearMessages( String queueName ) {
+        queueMessageSerialization.deleteAllMessages( queueName );
+        shardSerialization.deleteAllShards( queueName, 
actorSystemFig.getRegionLocal() );
     }
 
 
@@ -291,8 +293,8 @@ public class QueueMessageManagerImpl implements 
QueueMessageManager {
 
 
     @Override
-    public long getQueueDepth(String queueName) {
-        return messageCounterSerialization.getCounterValue( queueName, 
DatabaseQueueMessage.Type.DEFAULT );
+    public long getQueueDepth( String queueName, DatabaseQueueMessage.Type 
type ) {
+        return messageCounterSerialization.getCounterValue( queueName, type );
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index d8faeb2..509ccd9 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -129,8 +129,8 @@ public class QueueRefresher extends UntypedActor {
 
                 startingShards.put( shardKey, shardId );
 
-//                logger.debug("Refreshed queue {} region {} shard {} since {} 
found {}",
-//                    queueName, region, shardId, since, count );
+                logger.debug("Refreshed queue {} region {} shard {} since {} 
found {}",
+                    queueName, region, shardId, since, count );
             }
 
         } finally {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index e2c5c2c..51f6fd3 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -160,10 +160,6 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
         Timer.Context timer = metricsService.getMetricRegistry().timer( 
MetricsService.SEND_TIME_TOTAL ).time();
         try {
 
-            if ( queueManager.getQueueConfig( queueName ) == null ) {
-                throw new NotFoundException( "Queue not found: " + queueName );
-            }
-
             int maxRetries = qakkaFig.getMaxSendRetries();
             int retries = 0;
 
@@ -255,10 +251,6 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
 
     public Collection<DatabaseQueueMessage> getNextMessagesInternal( String 
queueName, int count ) {
 
-        if ( queueManager.getQueueConfig( queueName ) == null ) {
-            throw new NotFoundException( "Queue not found: " + queueName );
-        }
-
         if ( actorSystemManager.getClientActor() == null || 
!actorSystemManager.isReady() ) {
             logger.error("Akka Actor System is not ready yet for requests.");
             return Collections.EMPTY_LIST;
@@ -288,7 +280,7 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
                                 logger.warn( "getNextMessage {} SUCCESS after 
{} tries", queueName, tries );
                             }
                         }
-                        logger.debug("Returning queue {} messages {}", 
queueName, qprm.getQueueMessages().size());
+                        logger.trace("Returning queue {} messages {}", 
queueName, qprm.getQueueMessages().size());
                         return qprm.getQueueMessages();
 
 
@@ -325,10 +317,6 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
         Timer.Context timer = metricsService.getMetricRegistry().timer( 
MetricsService.ACK_TIME_TOTAL ).time();
         try {
 
-            if ( queueManager.getQueueConfig( queueName ) == null ) {
-                throw new NotFoundException( "Queue not found: " + queueName );
-            }
-
             QueueAckRequest message = new QueueAckRequest( queueName, 
queueMessageId );
             return sendMessageToLocalRouters( message );
 
@@ -342,10 +330,6 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
     @Override
     public Status requeueMessage(String queueName, UUID messageId) {
 
-        if ( queueManager.getQueueConfig( queueName ) == null ) {
-            throw new NotFoundException( "Queue not found: " + queueName );
-        }
-
         QueueAckRequest message = new QueueAckRequest( queueName, messageId );
         return sendMessageToLocalRouters( message );
     }
@@ -354,10 +338,6 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
     @Override
     public Status clearMessages(String queueName) {
 
-        if ( queueManager.getQueueConfig( queueName ) == null ) {
-            throw new NotFoundException( "Queue not found: " + queueName );
-        }
-
         // TODO: implement clear queue
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
index 6a066e9..c521cef 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
@@ -148,7 +148,7 @@ public class MultiShardMessageIterator implements 
Iterator<DatabaseQueueMessage>
 
         List<Row> rows = 
cassandraClient.getQueueMessageSession().execute(query).all();
 
-        //logger.debug("Query got {}: {}", rows.size(), query);
+        logger.trace("results {} from query {}", rows.size(), 
query.toString());
 
         if ( (rows == null || rows.size() == 0) && shardIterator.hasNext()) {
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
index 6c81863..64b2fce 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
@@ -28,4 +28,6 @@ public interface MessageCounterSerialization extends 
Migration {
     void decrementCounter(String queueName, DatabaseQueueMessage.Type type, 
long decrement);
 
     long getCounterValue(String name, DatabaseQueueMessage.Type type);
+
+    void resetCounter(String queueName, DatabaseQueueMessage.Type type);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
index 434c965..0791e5c 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
@@ -27,7 +27,7 @@ import java.util.UUID;
 public interface QueueMessageSerialization extends Migration {
 
     /**
-     * Write message to storage..
+     * Write message to storage.
      * If queueMessageId or createdTime are null, then values will be 
generated.
      */
     UUID writeMessage(final DatabaseQueueMessage message);
@@ -58,6 +58,13 @@ public interface QueueMessageSerialization extends Migration 
{
     void putInflight( DatabaseQueueMessage queueMessage );
 
     /**
+     * Delete all queue messages in the specified queue and in the current 
"local" region.
+     * Impacts messages available and messages inflight.
+     * @param queueName Name of queue to clear.
+     */
+    void deleteAllMessages( String queueName );
+
+    /**
      * Remove message from inflight table, write message to available table.
      */
     void timeoutInflight( DatabaseQueueMessage queueMessage );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index 36175c5..5c66f43 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -34,6 +34,7 @@ import 
org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,6 +104,11 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
             increment.set( 0L );
             decrement.set( 0L );
         }
+        void reset() {
+            this.baseCount = 0;
+            this.increment.set( 0L );
+            this.decrement.set( 0L );
+        }
         public long value() {
 
             // return totalInMemoryCount.get(); // for testing using just 
in-memory counter:
@@ -226,6 +232,42 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
     }
 
 
+    @Override
+    public void resetCounter(String queueName, DatabaseQueueMessage.Type type) 
{
+
+        // this sucks: "You cannot index, delete, or re-add a counter column"
+        // 
https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html
+        // so instead we decrement or increment the counter to zero
+
+        // get value first, before resetting in memory counter
+        long value = getCounterValue( queueName, type );
+
+        String key = buildKey( queueName, type );
+        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+        if ( inMemoryCount != null ) {
+            inMemoryCount.reset();
+        }
+
+        if ( value < 0 ) {
+
+            Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS )
+                .where( QueryBuilder.eq(   COLUMN_QUEUE_NAME, queueName ) )
+                .and(   QueryBuilder.eq(   COLUMN_MESSAGE_TYPE, 
type.toString() ) )
+                .with(  QueryBuilder.incr( COLUMN_COUNTER_VALUE, -1 * value ) 
); // incr must be positive
+            cassandraClient.getQueueMessageSession().execute( update );
+
+        } else {
+
+            Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS )
+                .where( QueryBuilder.eq(   COLUMN_QUEUE_NAME, queueName ) )
+                .and(   QueryBuilder.eq(   COLUMN_MESSAGE_TYPE, 
type.toString() ) )
+                .with(  QueryBuilder.decr( COLUMN_COUNTER_VALUE, value ) );
+            cassandraClient.getQueueMessageSession().execute( update );
+        }
+
+    }
+
+
     void incrementCounterInStorage( String queueName, 
DatabaseQueueMessage.Type type, long increment ) {
 
         Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS )

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index a174dd0..b1b57ae 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -41,14 +41,13 @@ import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.Message
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
 import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
 import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.java8.FuturesConvertersImpl;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
+import java.util.*;
 
 
 public class QueueMessageSerializationImpl implements 
QueueMessageSerialization {
@@ -136,6 +135,8 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
     @Override
     public UUID writeMessage(final DatabaseQueueMessage message) {
 
+        logger.trace("write message {}", message.getQueueMessageId());
+
         final UUID queueMessageId =  message.getQueueMessageId() == null ?
                 QakkaUtils.getTimeUuid() : message.getQueueMessageId();
 
@@ -151,8 +152,8 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
         Statement insert = createWriteMessageStatement( message );
         cassandraClient.getQueueMessageSession().execute(insert);
 
-//        logger.debug("Wrote queue {} queue message {} shardId {}",
-//            message.getQueueName(), message.getQueueMessageId(), 
message.getShardId() );
+        logger.trace("Wrote queue {} queue message {} shardId {}",
+            message.getQueueName(), message.getQueueMessageId(), 
message.getShardId() );
 
         shardCounterSerialization.incrementCounter( message.getQueueName(), 
shardType, message.getShardId(), 1 );
 
@@ -174,6 +175,8 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
             return null;
         }
 
+        logger.trace("loadMessage {}", queueMessageId);
+
         final long shardId;
         if ( shardIdOrNull == null ) {
             Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( 
type ) ?
@@ -223,6 +226,9 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
             final DatabaseQueueMessage.Type type,
             final UUID queueMessageId ) {
 
+
+        logger.trace("deleteMessage {}", queueMessageId);
+
         Statement delete = createDeleteMessageStatement( queueName, region, 
null, type,queueMessageId);
         cassandraClient.getQueueMessageSession().execute( delete );
 
@@ -233,6 +239,8 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
     @Override
     public DatabaseQueueMessageBody loadMessageData(final UUID messageId ){
 
+        logger.trace("loadMessageData {}", messageId);
+
         Clause messageIdClause = QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId 
);
 
         Statement select = QueryBuilder.select().from( 
TABLE_MESSAGE_DATA).where(messageIdClause);
@@ -252,6 +260,8 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
     public void writeMessageData( final UUID messageId, final 
DatabaseQueueMessageBody messageBody ) {
         Preconditions.checkArgument(QakkaUtils.isTimeUuid(messageId), 
"MessageId is not a type 1 UUID");
 
+        logger.trace("writeMessageData {}", messageId);
+
         Statement insert = QueryBuilder.insertInto(TABLE_MESSAGE_DATA)
                 .value( COLUMN_MESSAGE_ID, messageId)
                 .value( COLUMN_MESSAGE_DATA, messageBody.getBlob())
@@ -265,6 +275,8 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
     @Override
     public void deleteMessageData( final UUID messageId ) {
 
+        logger.trace("deleteMessageData {}", messageId);
+
         Clause messageIdClause = QueryBuilder.eq(COLUMN_MESSAGE_ID, messageId);
         Statement delete = 
QueryBuilder.delete().from(TABLE_MESSAGE_DATA).where(messageIdClause);
 
@@ -275,6 +287,8 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
     @Override
     public void putInflight( DatabaseQueueMessage message ) {
 
+        logger.trace("putInflight {}", message.getQueueMessageId());
+
         // create statement to write queue message to inflight table
 
         DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage(
@@ -319,8 +333,77 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
 
 
     @Override
+    public void deleteAllMessages( String queueName ) {
+
+        logger.trace("deleteAllMessages " + queueName);
+
+        Shard.Type[] shardTypes = new Shard.Type[] {Shard.Type.DEFAULT, 
Shard.Type.INFLIGHT};
+
+        // batch up and then execute delete statements
+        BatchStatement deleteAllBatch = new BatchStatement();
+        for ( Shard.Type shardType : shardTypes ) {
+            ShardIterator defaultShardIterator = new ShardIterator( 
cassandraClient,
+                queueName, actorSystemFig.getRegionLocal(), shardType, 
Optional.empty() );
+
+            while (defaultShardIterator.hasNext()) {
+                Shard shard = defaultShardIterator.next();
+                deleteAllBatch.add( createDeleteAllMessagesStatement( shard ) 
);
+
+                logger.trace("added queueName {} type {} shard {}",
+                    queueName, shardType, shard.getShardId() );
+            }
+        }
+
+        cassandraClient.getQueueMessageSession().execute( deleteAllBatch );
+        logger.trace("deleted messages in queue: " + queueName);
+
+        // clear counters, we only want to this to happen after successful 
deletion
+        for ( Shard.Type shardType : shardTypes ) {
+
+            ShardIterator defaultShardIterator = new ShardIterator( 
cassandraClient,
+                queueName, actorSystemFig.getRegionLocal(), shardType, 
Optional.empty() );
+
+            while (defaultShardIterator.hasNext()) {
+                Shard shard = defaultShardIterator.next();
+
+                shardCounterSerialization.resetCounter( shard );
+
+                DatabaseQueueMessage.Type type = Shard.Type.DEFAULT.equals( 
shardType )
+                    ? DatabaseQueueMessage.Type.DEFAULT : 
DatabaseQueueMessage.Type.INFLIGHT;
+                messageCounterSerialization.resetCounter( queueName, type );
+
+                logger.trace("reset counters for queueName {} type {} shard 
{}",
+                    queueName, shardType, shard.getShardId() );
+            }
+        }
+
+        // TODO: delete message data (separate method)
+    }
+
+
+    private Statement createDeleteAllMessagesStatement( Shard shard ) {
+
+        Clause queueNameClause = QueryBuilder.eq(      COLUMN_QUEUE_NAME, 
shard.getQueueName() );
+        Clause regionClause = QueryBuilder.eq(         COLUMN_REGION, 
shard.getRegion() );
+        Clause shardIdClause = QueryBuilder.eq(        COLUMN_SHARD_ID, 
shard.getShardId() );
+
+        DatabaseQueueMessage.Type dbqmType = Shard.Type.DEFAULT.equals( 
shard.getType() )
+            ? DatabaseQueueMessage.Type.DEFAULT : 
DatabaseQueueMessage.Type.INFLIGHT;
+
+        Statement deleteAll = QueryBuilder.delete().from( getTableName( 
dbqmType ))
+            .where(queueNameClause)
+            .and(regionClause)
+            .and(shardIdClause);
+
+        return deleteAll;
+    }
+
+
+    @Override
     public void timeoutInflight( DatabaseQueueMessage message ) {
 
+        logger.trace("timeoutInflight {}", message.getQueueMessageId() );
+
         // create statement to write queue message back to available table, 
with new UUID
 
         UUID newQueueMessageId = QakkaUtils.getTimeUuid();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
index 17a48c6..d3a46aa 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
 
 public class QueueSerializationImpl implements QueueSerialization {
 
-    private static final Logger logger = LoggerFactory.getLogger( 
QueueMessageSerializationImpl.class );
+    private static final Logger logger = LoggerFactory.getLogger( 
QueueSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
     private final CassandraConfig cassandraConfig;
@@ -83,6 +83,8 @@ public class QueueSerializationImpl implements 
QueueSerialization {
     @Override
     public void writeQueue(DatabaseQueue queue) {
 
+        logger.trace( "writeQueue " + queue.getName() );
+
         Statement insert = QueryBuilder.insertInto(TABLE_QUEUES)
                 .value(COLUMN_QUEUE_NAME, queue.getName())
                 .value(COLUMN_REGIONS, queue.getRegions())
@@ -100,6 +102,8 @@ public class QueueSerializationImpl implements 
QueueSerialization {
     @Override
     public DatabaseQueue getQueue(String name) {
 
+        logger.trace( "getQueue " + name );
+
         Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, name);
 
         Statement query = QueryBuilder.select().all().from(TABLE_QUEUES)
@@ -127,6 +131,8 @@ public class QueueSerializationImpl implements 
QueueSerialization {
     @Override
     public void deleteQueue(String name) {
 
+        logger.trace( "deleteQueue " + name );
+
         Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, name);
 
         Statement delete = QueryBuilder.delete().from(TABLE_QUEUES)
@@ -138,6 +144,8 @@ public class QueueSerializationImpl implements 
QueueSerialization {
     @Override
     public List<String> getListOfQueues() {
 
+        logger.trace( "getListOfQueues " );
+
         Statement select = QueryBuilder.select().all().from( TABLE_QUEUES );
         ResultSet rs = cassandraClient.getApplicationSession().execute( select 
);
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java
index c29c548..0e5e279 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java
@@ -23,7 +23,9 @@ import 
org.apache.usergrid.persistence.core.migration.schema.Migration;
 
 public interface ShardCounterSerialization extends Migration {
 
-    void incrementCounter(String queueName, Shard.Type type, long shardId, 
long increment);
+    void incrementCounter( String queueName, Shard.Type type, long shardId, 
long increment);
 
-    long getCounterValue(String name, Shard.Type type, long shardId);
+    long getCounterValue( String name, Shard.Type type, long shardId);
+
+    void resetCounter( Shard shard );
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java
index c0173ab..91fe0e1 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java
@@ -30,6 +30,8 @@ public interface ShardSerialization extends Migration {
 
     void deleteShard(final Shard shard);
 
+    void deleteAllShards(String queueName, String region);
+
     void updateShardPointer(final Shard shard);
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
index 276498d..8bb262b 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
@@ -55,16 +55,16 @@ public class ShardCounterSerializationImpl implements 
ShardCounterSerialization
 
     final static String TABLE_COUNTERS       = "shard_counters";
     final static String COLUMN_QUEUE_NAME    = "queue_name";
+    final static String COLUMN_SHARD_TYPE    = "shard_type";
     final static String COLUMN_SHARD_ID      = "shard_id";
     final static String COLUMN_COUNTER_VALUE = "counter_value";
-    final static String COLUMN_SHARD_TYPE    = "shard_type";
 
     static final String CQL =
         "CREATE TABLE IF NOT EXISTS shard_counters ( " +
-            "counter_value counter, " +
             "queue_name    varchar, " +
             "shard_type    varchar, " +
-            "shard_id      bigint, " +
+            "shard_id      bigint, "  +
+            "counter_value counter, " +
             "PRIMARY KEY (queue_name, shard_type, shard_id) " +
     ");  ";
 
@@ -85,6 +85,10 @@ public class ShardCounterSerializationImpl implements 
ShardCounterSerialization
         void setBaseCount( long baseCount ) {
             this.baseCount = baseCount;
         }
+        void reset() {
+            this.baseCount = 0;
+            this.increment.set( 0L );
+        }
     }
 
     private Map<String, InMemoryCount> inMemoryCounters = new 
ConcurrentHashMap<>(200);
@@ -159,6 +163,49 @@ public class ShardCounterSerializationImpl implements 
ShardCounterSerialization
         return inMemoryCounters.get( key ).value();
     }
 
+
+    @Override
+    public void resetCounter( Shard shard ) {
+
+        // this sucks: "You cannot index, delete, or re-add a counter column"
+        // 
https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html
+        // so instead we decrement or increment the counter to zero
+
+        String queueName = shard.getQueueName();
+        Shard.Type type = shard.getType();
+        long shardId = shard.getShardId();
+
+        // get value first, before resetting in memory counter
+        long value = getCounterValue( shard.getQueueName(), shard.getType(), 
shard.getShardId() );
+
+        String key = queueName + type + shardId;
+        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+        if ( inMemoryCount != null ) {
+            inMemoryCount.reset();
+        }
+
+        if ( value < 0 ) {
+
+            Statement update = QueryBuilder.update( TABLE_COUNTERS )
+                .where( QueryBuilder.eq(   COLUMN_QUEUE_NAME, queueName ) )
+                .and(   QueryBuilder.eq(   COLUMN_SHARD_TYPE, type.toString() 
) )
+                .and(   QueryBuilder.eq(   COLUMN_SHARD_ID, shardId ) )
+                .with(  QueryBuilder.incr( COLUMN_COUNTER_VALUE, -1 * value ) 
); // incr must be positive
+            cassandraClient.getQueueMessageSession().execute( update );
+
+        } else {
+
+            Statement update = QueryBuilder.update( TABLE_COUNTERS )
+                .where( QueryBuilder.eq(   COLUMN_QUEUE_NAME, queueName ) )
+                .and(   QueryBuilder.eq(   COLUMN_SHARD_TYPE, type.toString() 
) )
+                .and(   QueryBuilder.eq(   COLUMN_SHARD_ID, shardId ) )
+                .with(  QueryBuilder.decr( COLUMN_COUNTER_VALUE, value ) );
+            cassandraClient.getQueueMessageSession().execute( update );
+        }
+
+    }
+
+
     void incrementCounterInStorage( String queueName, Shard.Type type, long 
shardId, long increment ) {
 
         Statement update = QueryBuilder.update( TABLE_COUNTERS )

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
index cc5caab..501a6ca 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.usergrid.persistence.qakka.serialization.sharding.impl;
 
+import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.Assignment;
@@ -140,8 +141,6 @@ public class ShardSerializationImpl implements 
ShardSerialization {
         Clause activeClause = QueryBuilder.eq(COLUMN_ACTIVE, 1);
         Clause shardIdClause = QueryBuilder.eq(COLUMN_SHARD_ID, 
shard.getShardId());
 
-
-
         Statement delete = 
QueryBuilder.delete().from(getTableName(shard.getType()))
                 .where(queueNameClause)
                 .and(regionClause)
@@ -149,9 +148,29 @@ public class ShardSerializationImpl implements 
ShardSerialization {
                 .and(shardIdClause);
 
         cassandraClient.getQueueMessageSession().execute(delete);
+    }
+
+
+    @Override
+    public void deleteAllShards(String queueName, String region) {
+
+        BatchStatement batch = new BatchStatement();
 
+        Shard.Type[] shardTypes = new Shard.Type[]{Shard.Type.DEFAULT, 
Shard.Type.INFLIGHT};
+
+        for (Shard.Type shardType : shardTypes) {
+
+            Statement delete = QueryBuilder.delete().from( getTableName( 
shardType ) )
+                .where( QueryBuilder.eq(COLUMN_QUEUE_NAME, queueName) )
+                .and( QueryBuilder.eq(COLUMN_REGION, region) );
+
+            batch.add( delete );
+        }
+
+        cassandraClient.getQueueMessageSession().execute( batch );
     }
 
+
     public void updateShardPointer(final Shard shard){
 
         Assignment assignment = QueryBuilder.set(COLUMN_POINTER, 
shard.getPointer());

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
index 053dd36..e38d3bc 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
@@ -36,7 +36,6 @@ public interface LegacyQueueManager {
 
     /**
      * get the queue depth
-     * @return
      */
     long getQueueDepth();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index 2983340..b81e888 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -24,6 +24,7 @@ import com.google.inject.assistedinject.Assisted;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.*;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.queue.LegacyQueueFig;
 import org.apache.usergrid.persistence.queue.LegacyQueueManager;
@@ -145,7 +146,7 @@ public class QakkaQueueManager implements 
LegacyQueueManager {
 
         createQueueIfNecessary();
 
-        return queueMessageManager.getQueueDepth( scope.getName() );
+        return queueMessageManager.getQueueDepth( scope.getName(), 
DatabaseQueueMessage.Type.DEFAULT );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 5f0216f..d944b3d 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -38,6 +38,10 @@ import 
org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSeri
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardCounterSerializationImpl;
 import 
org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLog;
 import 
org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
 import org.apache.usergrid.persistence.queue.TestModule;
@@ -47,10 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.stream.Collectors;
 
 
@@ -175,17 +176,19 @@ public class QueueMessageManagerTest extends AbstractTest 
{
                     DataType.serializeValue( "{}", 
ProtocolVersion.NEWEST_SUPPORTED ) );
             }
 
+            DatabaseQueueMessage.Type type = DatabaseQueueMessage.Type.DEFAULT;
+
             int maxRetries = 15;
             int retries = 0;
             while (retries++ < maxRetries) {
                 distributedQueueService.refresh();
-                if (qmm.getQueueDepth( queueName ) == 40) {
+                if (qmm.getQueueDepth( queueName, type ) == numMessages) {
                     break;
                 }
                 Thread.sleep( 500 );
             }
 
-            Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );
+            Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName, 
type ) );
 
             // get all messages from queue
 
@@ -295,4 +298,87 @@ public class QueueMessageManagerTest extends AbstractTest {
         distributedQueueService.shutdown();
     }
 
+
+    @Test
+    public void testClearQueue() throws Exception {
+
+        Injector injector = getInjector();
+
+        DistributedQueueService distributedQueueService = 
injector.getInstance( DistributedQueueService.class );
+        ActorSystemFig actorSystemFig = injector.getInstance( 
ActorSystemFig.class );
+
+        String region = actorSystemFig.getRegionLocal();
+        App app = injector.getInstance( App.class );
+        app.start( "localhost", getNextAkkaPort(), region );
+
+        // create some number of queue messages
+
+        QueueManager queueManager = injector.getInstance( QueueManager.class );
+
+        String queueName = "queue_testClearQueue" + 
RandomStringUtils.randomAlphanumeric( 15 );
+
+        try {
+            QueueMessageManager qmm = injector.getInstance( 
QueueMessageManager.class );
+            queueManager.createQueue( new Queue( queueName, "test-type", 
region, region, 0L, 5, 10, null ) );
+
+            int numMessages = 40;
+
+            for (int i = 0; i < numMessages; i++) {
+                qmm.sendMessages(
+                    queueName,
+                    Collections.singletonList( region ),
+                    null, // delay
+                    null, // expiration
+                    "application/json",
+                    DataType.serializeValue( "{}", 
ProtocolVersion.NEWEST_SUPPORTED ) );
+            }
+
+            DatabaseQueueMessage.Type available = 
DatabaseQueueMessage.Type.DEFAULT;
+            DatabaseQueueMessage.Type inflight = 
DatabaseQueueMessage.Type.INFLIGHT;
+
+            int maxRetries = 15;
+            int retries = 0;
+            while (retries++ < maxRetries) {
+                distributedQueueService.refresh();
+                if (qmm.getQueueDepth( queueName, available ) == numMessages) {
+                    break;
+                }
+                Thread.sleep( 500 );
+            }
+
+            Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName, 
available ) );
+            Assert.assertEquals( 0, qmm.getQueueDepth( queueName, inflight ) );
+
+            // get half of the messages from the queue
+            List<QueueMessage> messages = qmm.getNextMessages( queueName, 
numMessages/2 );
+            Assert.assertEquals( numMessages/2, messages.size() );
+
+            Thread.sleep(500);
+
+            // now half messages should be available and half in flight
+            Assert.assertEquals( numMessages/2, qmm.getQueueDepth( queueName, 
available ) );
+            Assert.assertEquals( numMessages/2, qmm.getQueueDepth( queueName, 
inflight ) );
+
+            qmm.clearMessages( queueName );
+
+            // counters should show zero in queue
+            Assert.assertEquals( 0, qmm.getQueueDepth( queueName, available ) 
);
+            Assert.assertEquals( 0, qmm.getQueueDepth( queueName, inflight ) );
+
+            // TODO: check that all shards are gone
+
+            CassandraClient cassandraClient = injector.getInstance( 
CassandraClient.class );
+
+            ShardIterator defaultShardIterator = new ShardIterator( 
cassandraClient,
+                queueName, actorSystemFig.getRegionLocal(), 
Shard.Type.DEFAULT, Optional.empty() );
+            Assert.assertTrue( !defaultShardIterator.hasNext() );
+
+            ShardIterator inflightShardIterator = new ShardIterator( 
cassandraClient,
+                queueName, actorSystemFig.getRegionLocal(), 
Shard.Type.INFLIGHT, Optional.empty() );
+            Assert.assertTrue( !inflightShardIterator.hasNext() );
+
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index f5512e5..053c093 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -152,12 +152,14 @@ public class QueueActorServiceTest extends AbstractTest {
                     queueName, region, region, messageId, null, null );
             }
 
+            DatabaseQueueMessage.Type type = DatabaseQueueMessage.Type.DEFAULT;
+
             int maxRetries = 10;
             int retries = 0;
             long count = 0;
             while (retries++ < maxRetries) {
                 distributedQueueService.refresh();
-                count = queueMessageManager.getQueueDepth(  queueName );
+                count = queueMessageManager.getQueueDepth(  queueName, type );
                 if ( count == 100 ) {
                     break;
                 }
@@ -167,16 +169,16 @@ public class QueueActorServiceTest extends AbstractTest {
             Assert.assertEquals( 100, count );
 
             Assert.assertEquals( 25, distributedQueueService.getNextMessages( 
queueName, 25 ).size() );
-            Assert.assertEquals( 75, queueMessageManager.getQueueDepth(  
queueName ) );
+            Assert.assertEquals( 75, queueMessageManager.getQueueDepth(  
queueName, type ) );
 
             Assert.assertEquals( 25, distributedQueueService.getNextMessages( 
queueName, 25 ).size() );
-            Assert.assertEquals( 50, queueMessageManager.getQueueDepth(  
queueName ) );
+            Assert.assertEquals( 50, queueMessageManager.getQueueDepth(  
queueName, type ) );
 
             Assert.assertEquals( 25, distributedQueueService.getNextMessages( 
queueName, 25 ).size() );
-            Assert.assertEquals( 25, queueMessageManager.getQueueDepth(  
queueName ) );
+            Assert.assertEquals( 25, queueMessageManager.getQueueDepth(  
queueName, type ) );
 
             Assert.assertEquals( 25, distributedQueueService.getNextMessages( 
queueName, 25 ).size() );
-            Assert.assertEquals( 0,  queueMessageManager.getQueueDepth(  
queueName ) );
+            Assert.assertEquals( 0,  queueMessageManager.getQueueDepth(  
queueName, type ) );
 
             distributedQueueService.shutdown();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
----------------------------------------------------------------------
diff --git 
a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
 
b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
index 86cd387..a9d3e1b 100644
--- 
a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
+++ 
b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.rest.system;
 
 import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.databind.node.BooleanNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
@@ -31,6 +32,7 @@ import 
org.apache.usergrid.persistence.qakka.core.QueueMessageManager;
 import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
 import org.apache.usergrid.persistence.qakka.core.impl.QueueManagerImpl;
 import org.apache.usergrid.persistence.qakka.core.impl.QueueMessageManagerImpl;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import org.apache.usergrid.rest.AbstractContextResource;
 import org.apache.usergrid.rest.ApiResponse;
 import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
@@ -91,12 +93,11 @@ public class QueueSystemResource extends 
AbstractContextResource {
 
     @GET
     @RequireSystemAccess
-    @Path("info")
-    public ApiResponse getQueueInfo(
-        @QueryParam("callback") @DefaultValue("callback") String callback ) {
+    @Path("metrics")
+    public ApiResponse getQueueMetrics() {
 
         ApiResponse response = createApiResponse();
-        response.setAction( "get queue info" );
+        response.setAction( "get queue metrics" );
 
         MetricsService metricsService = injector.getInstance( 
MetricsService.class );
 
@@ -104,7 +105,7 @@ public class QueueSystemResource extends 
AbstractContextResource {
         final long nano = 1000000000;
 
         Map<String, Object> info = new HashMap<String, Object>() {{
-            put( "name", "Queue Info" );
+            put( "name", "Queue Metrics" );
             try {
                 put( "host", InetAddress.getLocalHost().getHostName() );
             } catch (UnknownHostException e) {
@@ -125,6 +126,25 @@ public class QueueSystemResource extends 
AbstractContextResource {
             }
         }};
 
+        response.setProperty( "data", info );
+
+        return response;
+
+    }
+
+    @GET
+    @RequireSystemAccess
+    @Path("info")
+    public ApiResponse getQueueInfo(
+        @QueryParam("callback") @DefaultValue("callback") String callback ) {
+
+        ApiResponse response = createApiResponse();
+        response.setAction( "get queue info" );
+
+        Map<String, Object> info = new HashMap<String, Object>() {{
+            put( "name", "Queue Info" );
+        }};
+
         QueueManager queueManager               = injector.getInstance( 
QueueManagerImpl.class );
         QueueMessageManager queueMessageManager = injector.getInstance( 
QueueMessageManagerImpl.class );
         InMemoryQueue inMemoryQueue             = injector.getInstance( 
InMemoryQueue.class );
@@ -133,16 +153,23 @@ public class QueueSystemResource extends 
AbstractContextResource {
         final List<String> listOfQueues = queueManager.getListOfQueues();
         for ( String queueName : listOfQueues ) {
 
-            Map<String, Object> queueInfo = new HashMap<>();
+            try {
+                Map<String, Object> queueInfo = new HashMap<>();
 
-            queueInfo.put("name", queueName );
-            queueInfo.put("depth", queueMessageManager.getQueueDepth( 
queueName ));
-            queueInfo.put("inmemory", inMemoryQueue.size( queueName ));
+                queueInfo.put( "name", queueName );
+                queueInfo.put( "depth",
+                    queueMessageManager.getQueueDepth( queueName, 
DatabaseQueueMessage.Type.DEFAULT ) );
+                queueInfo.put( "inflight",
+                    queueMessageManager.getQueueDepth( queueName, 
DatabaseQueueMessage.Type.INFLIGHT ) );
+                queueInfo.put( "inmemory", inMemoryQueue.size( queueName ) );
 
-            UUID newest = inMemoryQueue.getNewest( queueName );
-            queueInfo.put("since", newest == null ? "null" : 
newest.timestamp());
+                UUID newest = inMemoryQueue.getNewest( queueName );
+                queueInfo.put( "since", newest == null ? "null" : 
newest.timestamp() );
 
-            queues.add( queueInfo );
+                queues.add( queueInfo );
+            } catch ( Exception e ) {
+                logger.error("Error getting queue info for queue: " + 
queueName, e);
+            }
         }
 
         info.put("queues", queues);
@@ -152,4 +179,34 @@ public class QueueSystemResource extends 
AbstractContextResource {
         return response;
     }
 
+
+    @POST
+    @RequireSystemAccess
+    @Path("clear/{queueName}")
+    public ApiResponse clearQueue(
+        @PathParam("queueName") String queueName,
+        @QueryParam("callback") @DefaultValue("callback") String callback ) {
+
+        logger.debug("DMJ_TEMP clearQueue");
+
+        QueueManager queueManager = injector.getInstance( QueueManager.class );
+        QueueMessageManager queueMessageManager = injector.getInstance( 
QueueMessageManagerImpl.class );
+
+        if ( queueName == null ) {
+            throw new IllegalArgumentException( "queueName net specified in 
path" );
+        }
+
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
+        }
+
+        ApiResponse response = createApiResponse();
+        response.setAction( "clear queue: " + queueName );
+
+        queueMessageManager.clearMessages( queueName );
+
+        response.setProperty( "cleared", Boolean.TRUE );
+        return response;
+    }
+
 }

Reply via email to