Repository: usergrid Updated Branches: refs/heads/usergrid-1318-queue 7cc5c1c07 -> 198056227
Implemented queue clear (with test) and added some trace level logging Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/19805622 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/19805622 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/19805622 Branch: refs/heads/usergrid-1318-queue Commit: 198056227fef2fa896ef2c58aadabb5ada76abfd Parents: 7cc5c1c Author: Dave Johnson <snoopd...@apache.org> Authored: Wed Nov 2 16:26:00 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Wed Nov 2 16:26:00 2016 -0400 ---------------------------------------------------------------------- .../usergrid/persistence/qakka/QakkaFig.java | 4 +- .../persistence/qakka/api/QueueResource.java | 5 + .../qakka/core/QueueMessageManager.java | 5 +- .../core/impl/QueueMessageManagerImpl.java | 88 +++++++++--------- .../distributed/actors/QueueRefresher.java | 4 +- .../impl/DistributedQueueServiceImpl.java | 22 +---- .../MultiShardMessageIterator.java | 2 +- .../MessageCounterSerialization.java | 2 + .../QueueMessageSerialization.java | 9 +- .../impl/MessageCounterSerializationImpl.java | 42 +++++++++ .../impl/QueueMessageSerializationImpl.java | 93 ++++++++++++++++++- .../queues/impl/QueueSerializationImpl.java | 10 +- .../sharding/ShardCounterSerialization.java | 6 +- .../sharding/ShardSerialization.java | 2 + .../impl/ShardCounterSerializationImpl.java | 53 ++++++++++- .../sharding/impl/ShardSerializationImpl.java | 23 ++++- .../persistence/queue/LegacyQueueManager.java | 1 - .../queue/impl/QakkaQueueManager.java | 3 +- .../qakka/core/QueueMessageManagerTest.java | 98 ++++++++++++++++++-- .../distributed/QueueActorServiceTest.java | 12 ++- .../rest/system/QueueSystemResource.java | 81 +++++++++++++--- 21 files changed, 456 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java index e6a8667..87b5c83 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java @@ -109,7 +109,7 @@ public interface QakkaFig extends GuicyFig, Serializable { /** How long to wait for response from queue actor before timing out and trying again */ @Key(QUEUE_GET_TIMEOUT) - @Default("1") + @Default("3") int getGetTimeoutSeconds(); /** Max number of times to retry call to queue writer for queue send operation */ @@ -119,7 +119,7 @@ public interface QakkaFig extends GuicyFig, Serializable { /** How long to wait for response from queue writer before timing out and trying again */ @Key(QUEUE_SEND_TIMEOUT) - @Default("2") + @Default("5") int getSendTimeoutSeconds(); /** Once counter reaches this value, write it to permanent storage */ http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java index b609de3..ade5a70 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java @@ -23,11 +23,13 @@ import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; import com.google.common.base.Preconditions; import com.google.common.io.ByteStreams; import org.apache.usergrid.persistence.qakka.core.*; +import org.apache.usergrid.persistence.qakka.exceptions.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.inject.Inject; import javax.ws.rs.*; +import javax.ws.rs.NotFoundException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; @@ -256,6 +258,9 @@ public class QueueResource { String contentType, ByteBuffer byteBuffer) { + if ( queueManager.getQueueConfig( queueName ) == null ) { + throw new NotFoundException( "Queue " + queueName + " not found" ) ; + } Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java index b540fce..252dc22 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java @@ -83,5 +83,8 @@ public interface QueueMessageManager { */ QueueMessage getMessage(String queueName, UUID queueMessageId); - long getQueueDepth(String queueName); + /** + * Get queue depth for specified type, messages 'default' (available) or 'inflight' + */ + long getQueueDepth( String queueName, DatabaseQueueMessage.Type type ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java index 59e0ce0..59a14bd 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java @@ -23,10 +23,7 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.qakka.api.URIStrategy; -import org.apache.usergrid.persistence.qakka.core.QakkaUtils; -import org.apache.usergrid.persistence.qakka.core.QueueManager; -import org.apache.usergrid.persistence.qakka.core.QueueMessage; -import org.apache.usergrid.persistence.qakka.core.QueueMessageManager; +import org.apache.usergrid.persistence.qakka.core.*; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; import org.apache.usergrid.persistence.qakka.exceptions.BadRequestException; import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; @@ -35,6 +32,9 @@ import org.apache.usergrid.persistence.qakka.serialization.queuemessages.Databas import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization; import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,10 +42,7 @@ import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; import java.net.URISyntaxException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.UUID; +import java.util.*; @Singleton @@ -53,32 +50,37 @@ public class QueueMessageManagerImpl implements QueueMessageManager { private static final Logger logger = LoggerFactory.getLogger( QueueMessageManagerImpl.class ); - private final ActorSystemFig actorSystemFig; - private final QueueManager queueManager; - private final QueueMessageSerialization queueMessageSerialization; - private final DistributedQueueService distributedQueueService; - private final TransferLogSerialization transferLogSerialization; - private final URIStrategy uriStrategy; + private final ActorSystemFig actorSystemFig; + private final QueueManager queueManager; + private final QueueMessageSerialization queueMessageSerialization; + private final DistributedQueueService distributedQueueService; + private final TransferLogSerialization transferLogSerialization; + private final URIStrategy uriStrategy; private final MessageCounterSerialization messageCounterSerialization; - + private final ShardSerialization shardSerialization; + private final CassandraClient cassandraClient; @Inject public QueueMessageManagerImpl( - ActorSystemFig actorSystemFig, - QueueManager queueManager, - QueueMessageSerialization queueMessageSerialization, - DistributedQueueService distributedQueueService, - TransferLogSerialization transferLogSerialization, - URIStrategy uriStrategy, - MessageCounterSerialization messageCounterSerialization ) { - - this.actorSystemFig = actorSystemFig; - this.queueManager = queueManager; - this.queueMessageSerialization = queueMessageSerialization; - this.distributedQueueService = distributedQueueService; - this.transferLogSerialization = transferLogSerialization; - this.uriStrategy = uriStrategy; + ActorSystemFig actorSystemFig, + QueueManager queueManager, + QueueMessageSerialization queueMessageSerialization, + DistributedQueueService distributedQueueService, + TransferLogSerialization transferLogSerialization, + URIStrategy uriStrategy, + MessageCounterSerialization messageCounterSerialization, + ShardSerialization shardSerialization, + CassandraClient cassandraClient ) { + + this.actorSystemFig = actorSystemFig; + this.queueManager = queueManager; + this.queueMessageSerialization = queueMessageSerialization; + this.distributedQueueService = distributedQueueService; + this.transferLogSerialization = transferLogSerialization; + this.uriStrategy = uriStrategy; this.messageCounterSerialization = messageCounterSerialization; + this.shardSerialization = shardSerialization; + this.cassandraClient = cassandraClient; } @@ -86,6 +88,10 @@ public class QueueMessageManagerImpl implements QueueMessageManager { public void sendMessages(String queueName, List<String> destinationRegions, Long delayMs, Long expirationSecs, String contentType, ByteBuffer messageData) { + if ( queueManager.getQueueConfig( queueName ) == null ) { + throw new NotFoundException( "Queue " + queueName + " not found" ); + } + // TODO: implement delay and expiration // Preconditions.checkArgument(delayMs == null || delayMs > 0L, @@ -93,10 +99,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager { // Preconditions.checkArgument(expirationSecs == null || expirationSecs > 0L, // "Expiration seconds must be greater than zero"); - if ( queueManager.getQueueConfig( queueName ) == null ) { - throw new NotFoundException( "Queue not found: " + queueName ); - } - // get current time Long currentTimeMs = System.currentTimeMillis(); @@ -229,16 +231,16 @@ public class QueueMessageManagerImpl implements QueueMessageManager { } - @Override - public void clearMessages(String queueName) { - - if ( queueManager.getQueueConfig( queueName ) == null ) { - throw new NotFoundException( "Queue not found: " + queueName ); - } + // TODO: implement delete of message data too +// @Override +// public void clearMessageData( queueName ) { +// } - // TODO: implement clearMessages - throw new UnsupportedOperationException( "clearMessages not yet implemented" ); + @Override + public void clearMessages( String queueName ) { + queueMessageSerialization.deleteAllMessages( queueName ); + shardSerialization.deleteAllShards( queueName, actorSystemFig.getRegionLocal() ); } @@ -291,8 +293,8 @@ public class QueueMessageManagerImpl implements QueueMessageManager { @Override - public long getQueueDepth(String queueName) { - return messageCounterSerialization.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ); + public long getQueueDepth( String queueName, DatabaseQueueMessage.Type type ) { + return messageCounterSerialization.getCounterValue( queueName, type ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java index d8faeb2..509ccd9 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java @@ -129,8 +129,8 @@ public class QueueRefresher extends UntypedActor { startingShards.put( shardKey, shardId ); -// logger.debug("Refreshed queue {} region {} shard {} since {} found {}", -// queueName, region, shardId, since, count ); + logger.debug("Refreshed queue {} region {} shard {} since {} found {}", + queueName, region, shardId, since, count ); } } finally { http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java index e2c5c2c..51f6fd3 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -160,10 +160,6 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_TOTAL ).time(); try { - if ( queueManager.getQueueConfig( queueName ) == null ) { - throw new NotFoundException( "Queue not found: " + queueName ); - } - int maxRetries = qakkaFig.getMaxSendRetries(); int retries = 0; @@ -255,10 +251,6 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { public Collection<DatabaseQueueMessage> getNextMessagesInternal( String queueName, int count ) { - if ( queueManager.getQueueConfig( queueName ) == null ) { - throw new NotFoundException( "Queue not found: " + queueName ); - } - if ( actorSystemManager.getClientActor() == null || !actorSystemManager.isReady() ) { logger.error("Akka Actor System is not ready yet for requests."); return Collections.EMPTY_LIST; @@ -288,7 +280,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { logger.warn( "getNextMessage {} SUCCESS after {} tries", queueName, tries ); } } - logger.debug("Returning queue {} messages {}", queueName, qprm.getQueueMessages().size()); + logger.trace("Returning queue {} messages {}", queueName, qprm.getQueueMessages().size()); return qprm.getQueueMessages(); @@ -325,10 +317,6 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_TOTAL ).time(); try { - if ( queueManager.getQueueConfig( queueName ) == null ) { - throw new NotFoundException( "Queue not found: " + queueName ); - } - QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId ); return sendMessageToLocalRouters( message ); @@ -342,10 +330,6 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { @Override public Status requeueMessage(String queueName, UUID messageId) { - if ( queueManager.getQueueConfig( queueName ) == null ) { - throw new NotFoundException( "Queue not found: " + queueName ); - } - QueueAckRequest message = new QueueAckRequest( queueName, messageId ); return sendMessageToLocalRouters( message ); } @@ -354,10 +338,6 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { @Override public Status clearMessages(String queueName) { - if ( queueManager.getQueueConfig( queueName ) == null ) { - throw new NotFoundException( "Queue not found: " + queueName ); - } - // TODO: implement clear queue throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java index 6a066e9..c521cef 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java @@ -148,7 +148,7 @@ public class MultiShardMessageIterator implements Iterator<DatabaseQueueMessage> List<Row> rows = cassandraClient.getQueueMessageSession().execute(query).all(); - //logger.debug("Query got {}: {}", rows.size(), query); + logger.trace("results {} from query {}", rows.size(), query.toString()); if ( (rows == null || rows.size() == 0) && shardIterator.hasNext()) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java index 6c81863..64b2fce 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java @@ -28,4 +28,6 @@ public interface MessageCounterSerialization extends Migration { void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long decrement); long getCounterValue(String name, DatabaseQueueMessage.Type type); + + void resetCounter(String queueName, DatabaseQueueMessage.Type type); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java index 434c965..0791e5c 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java @@ -27,7 +27,7 @@ import java.util.UUID; public interface QueueMessageSerialization extends Migration { /** - * Write message to storage.. + * Write message to storage. * If queueMessageId or createdTime are null, then values will be generated. */ UUID writeMessage(final DatabaseQueueMessage message); @@ -58,6 +58,13 @@ public interface QueueMessageSerialization extends Migration { void putInflight( DatabaseQueueMessage queueMessage ); /** + * Delete all queue messages in the specified queue and in the current "local" region. + * Impacts messages available and messages inflight. + * @param queueName Name of queue to clear. + */ + void deleteAllMessages( String queueName ); + + /** * Remove message from inflight table, write message to available table. */ void timeoutInflight( DatabaseQueueMessage queueMessage ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java index 36175c5..5c66f43 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java @@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,6 +104,11 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat increment.set( 0L ); decrement.set( 0L ); } + void reset() { + this.baseCount = 0; + this.increment.set( 0L ); + this.decrement.set( 0L ); + } public long value() { // return totalInMemoryCount.get(); // for testing using just in-memory counter: @@ -226,6 +232,42 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat } + @Override + public void resetCounter(String queueName, DatabaseQueueMessage.Type type) { + + // this sucks: "You cannot index, delete, or re-add a counter column" + // https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html + // so instead we decrement or increment the counter to zero + + // get value first, before resetting in memory counter + long value = getCounterValue( queueName, type ); + + String key = buildKey( queueName, type ); + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + if ( inMemoryCount != null ) { + inMemoryCount.reset(); + } + + if ( value < 0 ) { + + Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS ) + .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) ) + .and( QueryBuilder.eq( COLUMN_MESSAGE_TYPE, type.toString() ) ) + .with( QueryBuilder.incr( COLUMN_COUNTER_VALUE, -1 * value ) ); // incr must be positive + cassandraClient.getQueueMessageSession().execute( update ); + + } else { + + Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS ) + .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) ) + .and( QueryBuilder.eq( COLUMN_MESSAGE_TYPE, type.toString() ) ) + .with( QueryBuilder.decr( COLUMN_COUNTER_VALUE, value ) ); + cassandraClient.getQueueMessageSession().execute( update ); + } + + } + + void incrementCounterInStorage( String queueName, DatabaseQueueMessage.Type type, long increment ) { Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS ) http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java index a174dd0..b1b57ae 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java @@ -41,14 +41,13 @@ import org.apache.usergrid.persistence.qakka.serialization.queuemessages.Message import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.java8.FuturesConvertersImpl; -import java.util.Collection; -import java.util.Collections; -import java.util.UUID; +import java.util.*; public class QueueMessageSerializationImpl implements QueueMessageSerialization { @@ -136,6 +135,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization @Override public UUID writeMessage(final DatabaseQueueMessage message) { + logger.trace("write message {}", message.getQueueMessageId()); + final UUID queueMessageId = message.getQueueMessageId() == null ? QakkaUtils.getTimeUuid() : message.getQueueMessageId(); @@ -151,8 +152,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization Statement insert = createWriteMessageStatement( message ); cassandraClient.getQueueMessageSession().execute(insert); -// logger.debug("Wrote queue {} queue message {} shardId {}", -// message.getQueueName(), message.getQueueMessageId(), message.getShardId() ); + logger.trace("Wrote queue {} queue message {} shardId {}", + message.getQueueName(), message.getQueueMessageId(), message.getShardId() ); shardCounterSerialization.incrementCounter( message.getQueueName(), shardType, message.getShardId(), 1 ); @@ -174,6 +175,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization return null; } + logger.trace("loadMessage {}", queueMessageId); + final long shardId; if ( shardIdOrNull == null ) { Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ? @@ -223,6 +226,9 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization final DatabaseQueueMessage.Type type, final UUID queueMessageId ) { + + logger.trace("deleteMessage {}", queueMessageId); + Statement delete = createDeleteMessageStatement( queueName, region, null, type,queueMessageId); cassandraClient.getQueueMessageSession().execute( delete ); @@ -233,6 +239,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization @Override public DatabaseQueueMessageBody loadMessageData(final UUID messageId ){ + logger.trace("loadMessageData {}", messageId); + Clause messageIdClause = QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId ); Statement select = QueryBuilder.select().from( TABLE_MESSAGE_DATA).where(messageIdClause); @@ -252,6 +260,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization public void writeMessageData( final UUID messageId, final DatabaseQueueMessageBody messageBody ) { Preconditions.checkArgument(QakkaUtils.isTimeUuid(messageId), "MessageId is not a type 1 UUID"); + logger.trace("writeMessageData {}", messageId); + Statement insert = QueryBuilder.insertInto(TABLE_MESSAGE_DATA) .value( COLUMN_MESSAGE_ID, messageId) .value( COLUMN_MESSAGE_DATA, messageBody.getBlob()) @@ -265,6 +275,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization @Override public void deleteMessageData( final UUID messageId ) { + logger.trace("deleteMessageData {}", messageId); + Clause messageIdClause = QueryBuilder.eq(COLUMN_MESSAGE_ID, messageId); Statement delete = QueryBuilder.delete().from(TABLE_MESSAGE_DATA).where(messageIdClause); @@ -275,6 +287,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization @Override public void putInflight( DatabaseQueueMessage message ) { + logger.trace("putInflight {}", message.getQueueMessageId()); + // create statement to write queue message to inflight table DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage( @@ -319,8 +333,77 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization @Override + public void deleteAllMessages( String queueName ) { + + logger.trace("deleteAllMessages " + queueName); + + Shard.Type[] shardTypes = new Shard.Type[] {Shard.Type.DEFAULT, Shard.Type.INFLIGHT}; + + // batch up and then execute delete statements + BatchStatement deleteAllBatch = new BatchStatement(); + for ( Shard.Type shardType : shardTypes ) { + ShardIterator defaultShardIterator = new ShardIterator( cassandraClient, + queueName, actorSystemFig.getRegionLocal(), shardType, Optional.empty() ); + + while (defaultShardIterator.hasNext()) { + Shard shard = defaultShardIterator.next(); + deleteAllBatch.add( createDeleteAllMessagesStatement( shard ) ); + + logger.trace("added queueName {} type {} shard {}", + queueName, shardType, shard.getShardId() ); + } + } + + cassandraClient.getQueueMessageSession().execute( deleteAllBatch ); + logger.trace("deleted messages in queue: " + queueName); + + // clear counters, we only want to this to happen after successful deletion + for ( Shard.Type shardType : shardTypes ) { + + ShardIterator defaultShardIterator = new ShardIterator( cassandraClient, + queueName, actorSystemFig.getRegionLocal(), shardType, Optional.empty() ); + + while (defaultShardIterator.hasNext()) { + Shard shard = defaultShardIterator.next(); + + shardCounterSerialization.resetCounter( shard ); + + DatabaseQueueMessage.Type type = Shard.Type.DEFAULT.equals( shardType ) + ? DatabaseQueueMessage.Type.DEFAULT : DatabaseQueueMessage.Type.INFLIGHT; + messageCounterSerialization.resetCounter( queueName, type ); + + logger.trace("reset counters for queueName {} type {} shard {}", + queueName, shardType, shard.getShardId() ); + } + } + + // TODO: delete message data (separate method) + } + + + private Statement createDeleteAllMessagesStatement( Shard shard ) { + + Clause queueNameClause = QueryBuilder.eq( COLUMN_QUEUE_NAME, shard.getQueueName() ); + Clause regionClause = QueryBuilder.eq( COLUMN_REGION, shard.getRegion() ); + Clause shardIdClause = QueryBuilder.eq( COLUMN_SHARD_ID, shard.getShardId() ); + + DatabaseQueueMessage.Type dbqmType = Shard.Type.DEFAULT.equals( shard.getType() ) + ? DatabaseQueueMessage.Type.DEFAULT : DatabaseQueueMessage.Type.INFLIGHT; + + Statement deleteAll = QueryBuilder.delete().from( getTableName( dbqmType )) + .where(queueNameClause) + .and(regionClause) + .and(shardIdClause); + + return deleteAll; + } + + + @Override public void timeoutInflight( DatabaseQueueMessage message ) { + logger.trace("timeoutInflight {}", message.getQueueMessageId() ); + // create statement to write queue message back to available table, with new UUID UUID newQueueMessageId = QakkaUtils.getTimeUuid(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java index 17a48c6..d3a46aa 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java @@ -44,7 +44,7 @@ import java.util.stream.Collectors; public class QueueSerializationImpl implements QueueSerialization { - private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class ); + private static final Logger logger = LoggerFactory.getLogger( QueueSerializationImpl.class ); private final CassandraClient cassandraClient; private final CassandraConfig cassandraConfig; @@ -83,6 +83,8 @@ public class QueueSerializationImpl implements QueueSerialization { @Override public void writeQueue(DatabaseQueue queue) { + logger.trace( "writeQueue " + queue.getName() ); + Statement insert = QueryBuilder.insertInto(TABLE_QUEUES) .value(COLUMN_QUEUE_NAME, queue.getName()) .value(COLUMN_REGIONS, queue.getRegions()) @@ -100,6 +102,8 @@ public class QueueSerializationImpl implements QueueSerialization { @Override public DatabaseQueue getQueue(String name) { + logger.trace( "getQueue " + name ); + Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, name); Statement query = QueryBuilder.select().all().from(TABLE_QUEUES) @@ -127,6 +131,8 @@ public class QueueSerializationImpl implements QueueSerialization { @Override public void deleteQueue(String name) { + logger.trace( "deleteQueue " + name ); + Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, name); Statement delete = QueryBuilder.delete().from(TABLE_QUEUES) @@ -138,6 +144,8 @@ public class QueueSerializationImpl implements QueueSerialization { @Override public List<String> getListOfQueues() { + logger.trace( "getListOfQueues " ); + Statement select = QueryBuilder.select().all().from( TABLE_QUEUES ); ResultSet rs = cassandraClient.getApplicationSession().execute( select ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java index c29c548..0e5e279 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java @@ -23,7 +23,9 @@ import org.apache.usergrid.persistence.core.migration.schema.Migration; public interface ShardCounterSerialization extends Migration { - void incrementCounter(String queueName, Shard.Type type, long shardId, long increment); + void incrementCounter( String queueName, Shard.Type type, long shardId, long increment); - long getCounterValue(String name, Shard.Type type, long shardId); + long getCounterValue( String name, Shard.Type type, long shardId); + + void resetCounter( Shard shard ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java index c0173ab..91fe0e1 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java @@ -30,6 +30,8 @@ public interface ShardSerialization extends Migration { void deleteShard(final Shard shard); + void deleteAllShards(String queueName, String region); + void updateShardPointer(final Shard shard); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java index 276498d..8bb262b 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java @@ -55,16 +55,16 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization final static String TABLE_COUNTERS = "shard_counters"; final static String COLUMN_QUEUE_NAME = "queue_name"; + final static String COLUMN_SHARD_TYPE = "shard_type"; final static String COLUMN_SHARD_ID = "shard_id"; final static String COLUMN_COUNTER_VALUE = "counter_value"; - final static String COLUMN_SHARD_TYPE = "shard_type"; static final String CQL = "CREATE TABLE IF NOT EXISTS shard_counters ( " + - "counter_value counter, " + "queue_name varchar, " + "shard_type varchar, " + - "shard_id bigint, " + + "shard_id bigint, " + + "counter_value counter, " + "PRIMARY KEY (queue_name, shard_type, shard_id) " + "); "; @@ -85,6 +85,10 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization void setBaseCount( long baseCount ) { this.baseCount = baseCount; } + void reset() { + this.baseCount = 0; + this.increment.set( 0L ); + } } private Map<String, InMemoryCount> inMemoryCounters = new ConcurrentHashMap<>(200); @@ -159,6 +163,49 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization return inMemoryCounters.get( key ).value(); } + + @Override + public void resetCounter( Shard shard ) { + + // this sucks: "You cannot index, delete, or re-add a counter column" + // https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html + // so instead we decrement or increment the counter to zero + + String queueName = shard.getQueueName(); + Shard.Type type = shard.getType(); + long shardId = shard.getShardId(); + + // get value first, before resetting in memory counter + long value = getCounterValue( shard.getQueueName(), shard.getType(), shard.getShardId() ); + + String key = queueName + type + shardId; + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + if ( inMemoryCount != null ) { + inMemoryCount.reset(); + } + + if ( value < 0 ) { + + Statement update = QueryBuilder.update( TABLE_COUNTERS ) + .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) ) + .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString() ) ) + .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) ) + .with( QueryBuilder.incr( COLUMN_COUNTER_VALUE, -1 * value ) ); // incr must be positive + cassandraClient.getQueueMessageSession().execute( update ); + + } else { + + Statement update = QueryBuilder.update( TABLE_COUNTERS ) + .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) ) + .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString() ) ) + .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) ) + .with( QueryBuilder.decr( COLUMN_COUNTER_VALUE, value ) ); + cassandraClient.getQueueMessageSession().execute( update ); + } + + } + + void incrementCounterInStorage( String queueName, Shard.Type type, long shardId, long increment ) { Statement update = QueryBuilder.update( TABLE_COUNTERS ) http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java index cc5caab..501a6ca 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java @@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.qakka.serialization.sharding.impl; +import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.Row; import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.Assignment; @@ -140,8 +141,6 @@ public class ShardSerializationImpl implements ShardSerialization { Clause activeClause = QueryBuilder.eq(COLUMN_ACTIVE, 1); Clause shardIdClause = QueryBuilder.eq(COLUMN_SHARD_ID, shard.getShardId()); - - Statement delete = QueryBuilder.delete().from(getTableName(shard.getType())) .where(queueNameClause) .and(regionClause) @@ -149,9 +148,29 @@ public class ShardSerializationImpl implements ShardSerialization { .and(shardIdClause); cassandraClient.getQueueMessageSession().execute(delete); + } + + + @Override + public void deleteAllShards(String queueName, String region) { + + BatchStatement batch = new BatchStatement(); + Shard.Type[] shardTypes = new Shard.Type[]{Shard.Type.DEFAULT, Shard.Type.INFLIGHT}; + + for (Shard.Type shardType : shardTypes) { + + Statement delete = QueryBuilder.delete().from( getTableName( shardType ) ) + .where( QueryBuilder.eq(COLUMN_QUEUE_NAME, queueName) ) + .and( QueryBuilder.eq(COLUMN_REGION, region) ); + + batch.add( delete ); + } + + cassandraClient.getQueueMessageSession().execute( batch ); } + public void updateShardPointer(final Shard shard){ Assignment assignment = QueryBuilder.set(COLUMN_POINTER, shard.getPointer()); http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java index 053dd36..e38d3bc 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java @@ -36,7 +36,6 @@ public interface LegacyQueueManager { /** * get the queue depth - * @return */ long getQueueDepth(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java index 2983340..b81e888 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java @@ -24,6 +24,7 @@ import com.google.inject.assistedinject.Assisted; import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.core.*; import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.apache.usergrid.persistence.queue.LegacyQueueFig; import org.apache.usergrid.persistence.queue.LegacyQueueManager; @@ -145,7 +146,7 @@ public class QakkaQueueManager implements LegacyQueueManager { createQueueIfNecessary(); - return queueMessageManager.getQueueDepth( scope.getName() ); + return queueMessageManager.getQueueDepth( scope.getName(), DatabaseQueueMessage.Type.DEFAULT ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java index 5f0216f..d944b3d 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java @@ -38,6 +38,10 @@ import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSeri import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; +import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardCounterSerializationImpl; import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLog; import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization; import org.apache.usergrid.persistence.queue.TestModule; @@ -47,10 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.stream.Collectors; @@ -175,17 +176,19 @@ public class QueueMessageManagerTest extends AbstractTest { DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) ); } + DatabaseQueueMessage.Type type = DatabaseQueueMessage.Type.DEFAULT; + int maxRetries = 15; int retries = 0; while (retries++ < maxRetries) { distributedQueueService.refresh(); - if (qmm.getQueueDepth( queueName ) == 40) { + if (qmm.getQueueDepth( queueName, type ) == numMessages) { break; } Thread.sleep( 500 ); } - Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) ); + Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName, type ) ); // get all messages from queue @@ -295,4 +298,87 @@ public class QueueMessageManagerTest extends AbstractTest { distributedQueueService.shutdown(); } + + @Test + public void testClearQueue() throws Exception { + + Injector injector = getInjector(); + + DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); + ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class ); + + String region = actorSystemFig.getRegionLocal(); + App app = injector.getInstance( App.class ); + app.start( "localhost", getNextAkkaPort(), region ); + + // create some number of queue messages + + QueueManager queueManager = injector.getInstance( QueueManager.class ); + + String queueName = "queue_testClearQueue" + RandomStringUtils.randomAlphanumeric( 15 ); + + try { + QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class ); + queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) ); + + int numMessages = 40; + + for (int i = 0; i < numMessages; i++) { + qmm.sendMessages( + queueName, + Collections.singletonList( region ), + null, // delay + null, // expiration + "application/json", + DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) ); + } + + DatabaseQueueMessage.Type available = DatabaseQueueMessage.Type.DEFAULT; + DatabaseQueueMessage.Type inflight = DatabaseQueueMessage.Type.INFLIGHT; + + int maxRetries = 15; + int retries = 0; + while (retries++ < maxRetries) { + distributedQueueService.refresh(); + if (qmm.getQueueDepth( queueName, available ) == numMessages) { + break; + } + Thread.sleep( 500 ); + } + + Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName, available ) ); + Assert.assertEquals( 0, qmm.getQueueDepth( queueName, inflight ) ); + + // get half of the messages from the queue + List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages/2 ); + Assert.assertEquals( numMessages/2, messages.size() ); + + Thread.sleep(500); + + // now half messages should be available and half in flight + Assert.assertEquals( numMessages/2, qmm.getQueueDepth( queueName, available ) ); + Assert.assertEquals( numMessages/2, qmm.getQueueDepth( queueName, inflight ) ); + + qmm.clearMessages( queueName ); + + // counters should show zero in queue + Assert.assertEquals( 0, qmm.getQueueDepth( queueName, available ) ); + Assert.assertEquals( 0, qmm.getQueueDepth( queueName, inflight ) ); + + // TODO: check that all shards are gone + + CassandraClient cassandraClient = injector.getInstance( CassandraClient.class ); + + ShardIterator defaultShardIterator = new ShardIterator( cassandraClient, + queueName, actorSystemFig.getRegionLocal(), Shard.Type.DEFAULT, Optional.empty() ); + Assert.assertTrue( !defaultShardIterator.hasNext() ); + + ShardIterator inflightShardIterator = new ShardIterator( cassandraClient, + queueName, actorSystemFig.getRegionLocal(), Shard.Type.INFLIGHT, Optional.empty() ); + Assert.assertTrue( !inflightShardIterator.hasNext() ); + + } finally { + queueManager.deleteQueue( queueName ); + } + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java index f5512e5..053c093 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java @@ -152,12 +152,14 @@ public class QueueActorServiceTest extends AbstractTest { queueName, region, region, messageId, null, null ); } + DatabaseQueueMessage.Type type = DatabaseQueueMessage.Type.DEFAULT; + int maxRetries = 10; int retries = 0; long count = 0; while (retries++ < maxRetries) { distributedQueueService.refresh(); - count = queueMessageManager.getQueueDepth( queueName ); + count = queueMessageManager.getQueueDepth( queueName, type ); if ( count == 100 ) { break; } @@ -167,16 +169,16 @@ public class QueueActorServiceTest extends AbstractTest { Assert.assertEquals( 100, count ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 75, queueMessageManager.getQueueDepth( queueName ) ); + Assert.assertEquals( 75, queueMessageManager.getQueueDepth( queueName, type ) ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 50, queueMessageManager.getQueueDepth( queueName ) ); + Assert.assertEquals( 50, queueMessageManager.getQueueDepth( queueName, type ) ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 25, queueMessageManager.getQueueDepth( queueName ) ); + Assert.assertEquals( 25, queueMessageManager.getQueueDepth( queueName, type ) ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 0, queueMessageManager.getQueueDepth( queueName ) ); + Assert.assertEquals( 0, queueMessageManager.getQueueDepth( queueName, type ) ); distributedQueueService.shutdown(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/19805622/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java index 86cd387..a9d3e1b 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java @@ -20,6 +20,7 @@ package org.apache.usergrid.rest.system; import com.codahale.metrics.Timer; +import com.fasterxml.jackson.databind.node.BooleanNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; @@ -31,6 +32,7 @@ import org.apache.usergrid.persistence.qakka.core.QueueMessageManager; import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; import org.apache.usergrid.persistence.qakka.core.impl.QueueManagerImpl; import org.apache.usergrid.persistence.qakka.core.impl.QueueMessageManagerImpl; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; import org.apache.usergrid.rest.AbstractContextResource; import org.apache.usergrid.rest.ApiResponse; import org.apache.usergrid.rest.security.annotations.RequireSystemAccess; @@ -91,12 +93,11 @@ public class QueueSystemResource extends AbstractContextResource { @GET @RequireSystemAccess - @Path("info") - public ApiResponse getQueueInfo( - @QueryParam("callback") @DefaultValue("callback") String callback ) { + @Path("metrics") + public ApiResponse getQueueMetrics() { ApiResponse response = createApiResponse(); - response.setAction( "get queue info" ); + response.setAction( "get queue metrics" ); MetricsService metricsService = injector.getInstance( MetricsService.class ); @@ -104,7 +105,7 @@ public class QueueSystemResource extends AbstractContextResource { final long nano = 1000000000; Map<String, Object> info = new HashMap<String, Object>() {{ - put( "name", "Queue Info" ); + put( "name", "Queue Metrics" ); try { put( "host", InetAddress.getLocalHost().getHostName() ); } catch (UnknownHostException e) { @@ -125,6 +126,25 @@ public class QueueSystemResource extends AbstractContextResource { } }}; + response.setProperty( "data", info ); + + return response; + + } + + @GET + @RequireSystemAccess + @Path("info") + public ApiResponse getQueueInfo( + @QueryParam("callback") @DefaultValue("callback") String callback ) { + + ApiResponse response = createApiResponse(); + response.setAction( "get queue info" ); + + Map<String, Object> info = new HashMap<String, Object>() {{ + put( "name", "Queue Info" ); + }}; + QueueManager queueManager = injector.getInstance( QueueManagerImpl.class ); QueueMessageManager queueMessageManager = injector.getInstance( QueueMessageManagerImpl.class ); InMemoryQueue inMemoryQueue = injector.getInstance( InMemoryQueue.class ); @@ -133,16 +153,23 @@ public class QueueSystemResource extends AbstractContextResource { final List<String> listOfQueues = queueManager.getListOfQueues(); for ( String queueName : listOfQueues ) { - Map<String, Object> queueInfo = new HashMap<>(); + try { + Map<String, Object> queueInfo = new HashMap<>(); - queueInfo.put("name", queueName ); - queueInfo.put("depth", queueMessageManager.getQueueDepth( queueName )); - queueInfo.put("inmemory", inMemoryQueue.size( queueName )); + queueInfo.put( "name", queueName ); + queueInfo.put( "depth", + queueMessageManager.getQueueDepth( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); + queueInfo.put( "inflight", + queueMessageManager.getQueueDepth( queueName, DatabaseQueueMessage.Type.INFLIGHT ) ); + queueInfo.put( "inmemory", inMemoryQueue.size( queueName ) ); - UUID newest = inMemoryQueue.getNewest( queueName ); - queueInfo.put("since", newest == null ? "null" : newest.timestamp()); + UUID newest = inMemoryQueue.getNewest( queueName ); + queueInfo.put( "since", newest == null ? "null" : newest.timestamp() ); - queues.add( queueInfo ); + queues.add( queueInfo ); + } catch ( Exception e ) { + logger.error("Error getting queue info for queue: " + queueName, e); + } } info.put("queues", queues); @@ -152,4 +179,34 @@ public class QueueSystemResource extends AbstractContextResource { return response; } + + @POST + @RequireSystemAccess + @Path("clear/{queueName}") + public ApiResponse clearQueue( + @PathParam("queueName") String queueName, + @QueryParam("callback") @DefaultValue("callback") String callback ) { + + logger.debug("DMJ_TEMP clearQueue"); + + QueueManager queueManager = injector.getInstance( QueueManager.class ); + QueueMessageManager queueMessageManager = injector.getInstance( QueueMessageManagerImpl.class ); + + if ( queueName == null ) { + throw new IllegalArgumentException( "queueName net specified in path" ); + } + + if ( queueManager.getQueueConfig( queueName ) == null ) { + throw new NotFoundException( "Queue not found: " + queueName ); + } + + ApiResponse response = createApiResponse(); + response.setAction( "clear queue: " + queueName ); + + queueMessageManager.clearMessages( queueName ); + + response.setProperty( "cleared", Boolean.TRUE ); + return response; + } + }