Change to use proper Guice injection instead of static injector kludge.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5a19ba9a Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5a19ba9a Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5a19ba9a Branch: refs/heads/master Commit: 5a19ba9a748c5d96f5356d77df1e7845aa92fc0f Parents: 2cd8ecb Author: Dave Johnson <snoopd...@apache.org> Authored: Fri Sep 30 17:18:24 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Fri Sep 30 17:18:24 2016 -0400 ---------------------------------------------------------------------- build.log | 15 ++ .../apache/usergrid/persistence/qakka/App.java | 17 +- .../qakka/core/impl/InMemoryQueue.java | 11 +- .../core/impl/QueueMessageManagerImpl.java | 18 +- .../distributed/DistributedQueueService.java | 2 +- .../qakka/distributed/actors/QueueActor.java | 56 +++--- .../distributed/actors/QueueActorHelper.java | 105 ++++++++++- .../distributed/actors/QueueActorRouter.java | 11 +- .../distributed/actors/QueueRefresher.java | 120 +----------- .../qakka/distributed/actors/QueueSender.java | 5 +- .../distributed/actors/QueueSenderRouter.java | 10 +- .../distributed/actors/QueueTimeouter.java | 26 +-- .../qakka/distributed/actors/QueueWriter.java | 6 +- .../distributed/actors/QueueWriterRouter.java | 11 +- .../distributed/actors/ShardAllocator.java | 22 +-- .../impl/DistributedQueueServiceImpl.java | 26 ++- .../qakka/core/QueueMessageManagerTest.java | 178 +++++++++--------- .../distributed/QueueActorServiceTest.java | 112 ++++++----- .../actors/QueueActorHelperTest.java | 186 +++++++++++-------- .../distributed/actors/QueueReaderTest.java | 11 +- .../distributed/actors/QueueTimeouterTest.java | 7 +- .../distributed/actors/ShardAllocatorTest.java | 30 +-- .../queue/src/test/resources/log4j.properties | 6 +- 23 files changed, 531 insertions(+), 460 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/build.log ---------------------------------------------------------------------- diff --git a/build.log b/build.log new file mode 100644 index 0000000..43ffacd --- /dev/null +++ b/build.log @@ -0,0 +1,15 @@ +[INFO] Scanning for projects... +[INFO] ------------------------------------------------------------------------ +[INFO] BUILD FAILURE +[INFO] ------------------------------------------------------------------------ +[INFO] Total time: 0.086 s +[INFO] Finished at: 2016-09-30T07:54:28-04:00 +[INFO] Final Memory: 46M/6710M +[INFO] ------------------------------------------------------------------------ +[ERROR] The goal you specified requires a project to execute but there is no POM in this directory (/Users/ApigeeCorporation/src/usergrid-snoopdave). Please verify you invoked Maven from the correct directory. -> [Help 1] +[ERROR] +[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. +[ERROR] Re-run Maven using the -X switch to enable full debug logging. +[ERROR] +[ERROR] For more information about the errors and possible solutions, please read the following articles: +[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MissingProjectException http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java index abbf3da..35fdb20 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java @@ -44,9 +44,6 @@ import org.slf4j.LoggerFactory; public class App implements MetricsService { private static final Logger logger = LoggerFactory.getLogger( App.class ); - // TODO: can we avoid this kludge with better Akka-Guice integration? - static public Injector INJECTOR; - private final ActorSystemFig actorSystemFig; private final ActorSystemManager actorSystemManager; private final DistributedQueueService distributedQueueService; @@ -55,14 +52,16 @@ public class App implements MetricsService { @Inject public App( - Injector injector, QakkaFig qakkaFig, ActorSystemFig actorSystemFig, ActorSystemManager actorSystemManager, DistributedQueueService distributedQueueService, - MigrationManager migrationManager) { + MigrationManager migrationManager, + QueueActorRouterProducer queueActorRouterProducer, + QueueWriterRouterProducer queueWriterRouterProducer, + QueueSenderRouterProducer queueSenderRouterProducer + ) { - this.INJECTOR = injector; this.actorSystemFig = actorSystemFig; this.actorSystemManager = actorSystemManager; this.distributedQueueService = distributedQueueService; @@ -74,9 +73,9 @@ public class App implements MetricsService { } catch (MigrationException e) { throw new QakkaRuntimeException( "Error running migration", e ); } - actorSystemManager.registerRouterProducer( injector.getInstance( QueueActorRouterProducer.class ) ); - actorSystemManager.registerRouterProducer( injector.getInstance( QueueWriterRouterProducer.class ) ); - actorSystemManager.registerRouterProducer( injector.getInstance( QueueSenderRouterProducer.class ) ); + actorSystemManager.registerRouterProducer( queueActorRouterProducer ); + actorSystemManager.registerRouterProducer( queueWriterRouterProducer ); + actorSystemManager.registerRouterProducer( queueSenderRouterProducer ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java index 27de079..1f6fe6e 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java @@ -19,8 +19,10 @@ package org.apache.usergrid.persistence.qakka.core.impl; +import com.datastax.driver.core.utils.UUIDs; import com.google.inject.Inject; import com.google.inject.Singleton; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.distributed.actors.QueueRefresher; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; @@ -71,7 +73,14 @@ public class InMemoryQueue { } public UUID getNewest( String queueName ) { - return newestByQueueName.get( queueName ); + UUID newest = newestByQueueName.get( queueName ); +// if ( newest == null ) { +// // Create oldest UUID from a UNIX timestamp via DataStax utility +// // https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/utils/UUIDs.html +// newest = UUIDs.startOf( 0L ); +// newestByQueueName.put( queueName, newest ); +// } + return newest; } public DatabaseQueueMessage poll( String queueName ) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java index 691c1a6..59e0ce0 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java @@ -138,10 +138,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager { @Override public List<QueueMessage> getNextMessages(String queueName, int count) { - if ( queueManager.getQueueConfig( queueName ) == null ) { - throw new NotFoundException( "Queue not found: " + queueName ); - } - Collection<DatabaseQueueMessage> dbMessages = distributedQueueService.getNextMessages( queueName, count ); List<QueueMessage> queueMessages = joinMessages( queueName, dbMessages ); @@ -210,15 +206,14 @@ public class QueueMessageManagerImpl implements QueueMessageManager { @Override public void ackMessage(String queueName, UUID queueMessageId) { - if ( queueManager.getQueueConfig( queueName ) == null ) { - throw new NotFoundException( "Queue not found: " + queueName ); - } - DistributedQueueService.Status status = distributedQueueService.ackMessage( queueName, queueMessageId ); - if ( DistributedQueueService.Status.BAD_REQUEST.equals( status )) { + if ( DistributedQueueService.Status.NOT_INFLIGHT.equals( status )) { throw new BadRequestException( "Message not inflight" ); + } else if ( DistributedQueueService.Status.BAD_REQUEST.equals( status )) { + throw new BadRequestException( "Bad request" ); + } else if ( DistributedQueueService.Status.ERROR.equals( status )) { throw new QakkaRuntimeException( "Unable to ack message due to error" ); } @@ -228,10 +223,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager { @Override public void requeueMessage(String queueName, UUID messageId, Long delayMs) { - if ( queueManager.getQueueConfig( queueName ) == null ) { - throw new NotFoundException( "Queue not found: " + queueName ); - } - // TODO: implement requeueMessage throw new UnsupportedOperationException( "requeueMessage not yet implemented" ); @@ -268,7 +259,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager { // first look in INFLIGHT storage - DatabaseQueueMessage dbMessage = queueMessageSerialization.loadMessage( queueName, actorSystemFig.getRegionLocal(), null, DatabaseQueueMessage.Type.INFLIGHT, queueMessageId ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java index b02a623..b11dcff 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java @@ -30,7 +30,7 @@ import java.util.UUID; */ public interface DistributedQueueService { - enum Status { SUCCESS, ERROR, BAD_REQUEST }; + enum Status { SUCCESS, ERROR, BAD_REQUEST, NOT_INFLIGHT }; void init(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java index 87342ad..5ebba3d 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java @@ -24,7 +24,9 @@ import akka.actor.Cancellable; import akka.actor.Props; import akka.actor.UntypedActor; import com.codahale.metrics.Timer; +import com.google.inject.Inject; import com.google.inject.Injector; +import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; import org.apache.usergrid.persistence.qakka.App; import org.apache.usergrid.persistence.qakka.MetricsService; import org.apache.usergrid.persistence.qakka.QakkaFig; @@ -65,10 +67,13 @@ public class QueueActor extends UntypedActor { private final AtomicLong messageCount = new AtomicLong(0); private final Set<String> queuesSeen = new HashSet<>(); + private final Injector injector; - public QueueActor() { - Injector injector = App.INJECTOR; + @Inject + public QueueActor( Injector injector ) { + + this.injector = injector; qakkaFig = injector.getInstance( QakkaFig.class ); inMemoryQueue = injector.getInstance( InMemoryQueue.class ); @@ -107,7 +112,7 @@ public class QueueActor extends UntypedActor { getContext().dispatcher(), getSelf()); timeoutSchedulersByQueueName.put( request.getQueueName(), scheduler ); - logger.debug("Created scheduler for queue {}", request.getQueueName() ); + logger.debug("Created timeouter for queue {}", request.getQueueName() ); } if ( shardAllocationSchedulersByQueueName.get( request.getQueueName() ) == null ) { @@ -126,18 +131,20 @@ public class QueueActor extends UntypedActor { QueueRefreshRequest request = (QueueRefreshRequest)message; queuesSeen.add( request.getQueueName() ); - - if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) { - - if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) { - ActorRef readerRef = getContext().actorOf( Props.create( - QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader"); - queueReadersByQueueName.put( request.getQueueName(), readerRef ); - } - } - - // hand-off to queue's reader - queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() ); + queueActorHelper.queueRefresh( request.getQueueName() ); + +// if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) { +// +// if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) { +// ActorRef readerRef = getContext().actorOf( +// Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ), +// request.getQueueName() + "_reader"); +// queueReadersByQueueName.put( request.getQueueName(), readerRef ); +// } +// } +// +// // hand-off to queue's reader +// queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() ); } else if ( message instanceof QueueTimeoutRequest ) { QueueTimeoutRequest request = (QueueTimeoutRequest)message; @@ -145,8 +152,9 @@ public class QueueActor extends UntypedActor { queuesSeen.add( request.getQueueName() ); if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) { - ActorRef readerRef = getContext().actorOf( Props.create( - QueueTimeouter.class, request.getQueueName()), request.getQueueName() + "_timeouter"); + ActorRef readerRef = getContext().actorOf( + Props.create( GuiceActorProducer.class, injector, QueueTimeouter.class), + request.getQueueName() + "_timeouter"); queueTimeoutersByQueueName.put( request.getQueueName(), readerRef ); } @@ -160,8 +168,9 @@ public class QueueActor extends UntypedActor { queuesSeen.add( request.getQueueName() ); if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) { - ActorRef readerRef = getContext().actorOf( Props.create( - ShardAllocator.class, request.getQueueName()), request.getQueueName() + "_shard_allocator"); + ActorRef readerRef = getContext().actorOf( + Props.create( GuiceActorProducer.class, injector, ShardAllocator.class), + request.getQueueName() + "_shard_allocator"); shardAllocatorsByQueueName.put( request.getQueueName(), readerRef ); } @@ -181,15 +190,15 @@ public class QueueActor extends UntypedActor { while (queueMessages.size() < queueGetRequest.getNumRequested()) { - DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueGetRequest.getQueueName() ); + DatabaseQueueMessage queueMessage = inMemoryQueue.peek( queueGetRequest.getQueueName() ); if (queueMessage != null) { if (queueActorHelper.putInflight( queueGetRequest.getQueueName(), queueMessage )) { queueMessages.add( queueMessage ); } } else { -// logger.debug("in-memory queue for {} is empty, object is: {}", -// queueGetRequest.getQueueName(), inMemoryQueue ); + logger.debug("in-memory queue for {} is empty, object is: {}", + queueGetRequest.getQueueName(), inMemoryQueue ); break; } } @@ -199,6 +208,9 @@ public class QueueActor extends UntypedActor { DatabaseQueueMessage.Type.DEFAULT, queueMessages.size()); + logger.debug("{} returning {} for queue {}", + this, queueMessages.size(), queueGetRequest.getQueueName()); + getSender().tell( new QueueGetResponse( DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java index 26db903..68250df 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java @@ -19,17 +19,28 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; +import com.codahale.metrics.Timer; import com.google.inject.Inject; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.MetricsService; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator; import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog; import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.text.DecimalFormat; +import java.util.Optional; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; public class QueueActorHelper { @@ -38,18 +49,33 @@ public class QueueActorHelper { private final ActorSystemFig actorSystemFig; private final QueueMessageSerialization messageSerialization; private final AuditLogSerialization auditLogSerialization; + private final InMemoryQueue inMemoryQueue; + private final QakkaFig qakkaFig; + private final MetricsService metricsService; + private final CassandraClient cassandraClient; + + private final AtomicLong runCount = new AtomicLong(0); + private final AtomicLong totalRead = new AtomicLong(0); @Inject public QueueActorHelper( - ActorSystemFig actorSystemFig, + QakkaFig qakkaFig, + ActorSystemFig actorSystemFig, QueueMessageSerialization messageSerialization, - AuditLogSerialization auditLogSerialization + AuditLogSerialization auditLogSerialization, + InMemoryQueue inMemoryQueue, + MetricsService metricsService, + CassandraClient cassandraClient ) { - this.actorSystemFig = actorSystemFig; - this.messageSerialization = messageSerialization; + this.actorSystemFig = actorSystemFig; + this.messageSerialization = messageSerialization; this.auditLogSerialization = auditLogSerialization; + this.inMemoryQueue = inMemoryQueue; + this.qakkaFig = qakkaFig; + this.metricsService = metricsService; + this.cassandraClient = cassandraClient; } @@ -78,7 +104,7 @@ public class QueueActorHelper { queueName, queueMessageId, DatabaseQueueMessage.Type.INFLIGHT ); if ( queueMessage == null ) { - return DistributedQueueService.Status.BAD_REQUEST; + return DistributedQueueService.Status.NOT_INFLIGHT; } boolean error = false; @@ -164,4 +190,73 @@ public class QueueActorHelper { return true; } + + void queueRefresh( String queueName ) { + + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time(); + + try { + + if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) { + + ShardIterator shardIterator = new ShardIterator( + cassandraClient, queueName, actorSystemFig.getRegionLocal(), + Shard.Type.DEFAULT, Optional.empty() ); + + UUID since = inMemoryQueue.getNewest( queueName ); + + String region = actorSystemFig.getRegionLocal(); + MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator( + cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT, + shardIterator, since); + + int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName ); + int count = 0; + + while ( multiShardIterator.hasNext() && count < need ) { + DatabaseQueueMessage queueMessage = multiShardIterator.next(); + inMemoryQueue.add( queueName, queueMessage ); + count++; + } + + long runs = runCount.incrementAndGet(); + long readCount = totalRead.addAndGet( count ); + + if ( logger.isDebugEnabled() && runs % 100 == 0 ) { + + final DecimalFormat format = new DecimalFormat("##.###"); + final long nano = 1000000000; + Timer t = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME ); + + logger.debug("QueueRefresher for queue '{}' stats:\n" + + " Num runs={}\n" + + " Read count={}\n" + + " Mean={}\n" + + " One min rate={}\n" + + " Five min rate={}\n" + + " Snapshot mean={}\n" + + " Snapshot min={}\n" + + " Snapshot max={}", + queueName, + t.getCount(), + readCount, + format.format( t.getMeanRate() ), + format.format( t.getOneMinuteRate() ), + format.format( t.getFiveMinuteRate() ), + format.format( t.getSnapshot().getMean() / nano ), + format.format( (double) t.getSnapshot().getMin() / nano ), + format.format( (double) t.getSnapshot().getMax() / nano ) ); + } + + if ( count > 0 ) { + logger.debug( "Added {} in-memory for queue {}, new size = {}", + count, queueName, inMemoryQueue.size( queueName ) ); + } + } + + } finally { + timer.close(); + } + + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java index 97e591c..9257a0d 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java @@ -24,6 +24,9 @@ import akka.actor.Props; import akka.actor.UntypedActor; import akka.routing.ConsistentHashingRouter; import akka.routing.FromConfig; +import com.google.inject.Inject; +import com.google.inject.Injector; +import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; import org.apache.usergrid.persistence.qakka.distributed.messages.*; @@ -35,9 +38,11 @@ public class QueueActorRouter extends UntypedActor { private final ActorRef routerRef; - public QueueActorRouter() { - routerRef = getContext().actorOf( - FromConfig.getInstance().props( Props.create(QueueActor.class)), "router"); + @Inject + public QueueActorRouter( Injector injector ) { + + this.routerRef = getContext().actorOf( FromConfig.getInstance().props( + Props.create(GuiceActorProducer.class, injector, QueueActor.class)), "router"); } @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java index dbd5235..2f70088 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java @@ -20,55 +20,20 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import akka.actor.UntypedActor; -import com.codahale.metrics.Timer; -import com.google.inject.Injector; -import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; -import org.apache.usergrid.persistence.qakka.App; -import org.apache.usergrid.persistence.qakka.MetricsService; -import org.apache.usergrid.persistence.qakka.QakkaFig; -import org.apache.usergrid.persistence.qakka.core.CassandraClient; -import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; -import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; +import com.google.inject.Inject; import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest; -import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; -import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator; -import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; -import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; -import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; -import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.text.DecimalFormat; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; - public class QueueRefresher extends UntypedActor { private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class ); - private final String queueName; - private final InMemoryQueue inMemoryQueue; - private final QakkaFig qakkaFig; - private final ActorSystemFig actorSystemFig; - private final MetricsService metricsService; - private final CassandraClient cassandraClient; - - private final AtomicLong runCount = new AtomicLong(0); - private final AtomicLong totalRead = new AtomicLong(0); - + final QueueActorHelper helper; - public QueueRefresher(String queueName ) { - this.queueName = queueName; - - Injector injector = App.INJECTOR; - - inMemoryQueue = injector.getInstance( InMemoryQueue.class ); - qakkaFig = injector.getInstance( QakkaFig.class ); - actorSystemFig = injector.getInstance( ActorSystemFig.class ); - metricsService = injector.getInstance( MetricsService.class ); - cassandraClient = injector.getInstance( CassandraClientImpl.class ); + @Inject + public QueueRefresher( QueueActorHelper helper ) { + this.helper = helper; } @@ -78,78 +43,9 @@ public class QueueRefresher extends UntypedActor { if ( message instanceof QueueRefreshRequest ) { QueueRefreshRequest request = (QueueRefreshRequest) message; - - //logger.debug( "running for queue {}", queueName ); - - if (!request.getQueueName().equals( queueName )) { - throw new QakkaRuntimeException( - "QueueWriter for " + queueName + ": Incorrect queueName " + request.getQueueName() ); - } - - Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time(); - - try { - - if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) { - - ShardIterator shardIterator = new ShardIterator( - cassandraClient, queueName, actorSystemFig.getRegionLocal(), - Shard.Type.DEFAULT, Optional.empty() ); - - UUID since = inMemoryQueue.getNewest( queueName ); - - String region = actorSystemFig.getRegionLocal(); - MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator( - cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT, - shardIterator, since); - - int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName ); - int count = 0; - - while ( multiShardIterator.hasNext() && count < need ) { - DatabaseQueueMessage queueMessage = multiShardIterator.next(); - inMemoryQueue.add( queueName, queueMessage ); - count++; - } - - long runs = runCount.incrementAndGet(); - long readCount = totalRead.addAndGet( count ); - - if ( logger.isDebugEnabled() && runs % 100 == 0 ) { - - final DecimalFormat format = new DecimalFormat("##.###"); - final long nano = 1000000000; - Timer t = metricsService.getMetricRegistry().timer(MetricsService.REFRESH_TIME ); - - logger.debug("QueueRefresher for queue '{}' stats:\n" + - " Num runs={}\n" + - " Read count={}\n" + - " Mean={}\n" + - " One min rate={}\n" + - " Five min rate={}\n" + - " Snapshot mean={}\n" + - " Snapshot min={}\n" + - " Snapshot max={}", - queueName, - t.getCount(), - readCount, - format.format( t.getMeanRate() ), - format.format( t.getOneMinuteRate() ), - format.format( t.getFiveMinuteRate() ), - format.format( t.getSnapshot().getMean() / nano ), - format.format( (double) t.getSnapshot().getMin() / nano ), - format.format( (double) t.getSnapshot().getMax() / nano ) ); - } - -// if ( count > 0 ) { -// logger.debug( "Added {} in-memory for queue {}, new size = {}", -// count, queueName, inMemoryQueue.size( queueName ) ); -// } - } - - } finally { - timer.close(); - } + logger.debug( "running for queue {}", request.getQueueName() ); + String queueName = request.getQueueName(); + helper.queueRefresh( queueName ); } else { unhandled( message ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java index 03d1216..739e1c4 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java @@ -25,6 +25,7 @@ import akka.cluster.client.ClusterClient; import akka.pattern.Patterns; import akka.util.Timeout; import com.codahale.metrics.Timer; +import com.google.inject.Inject; import com.google.inject.Injector; import org.apache.commons.lang3.RandomStringUtils; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; @@ -63,9 +64,9 @@ public class QueueSender extends UntypedActor { private final QakkaFig qakkaFig; private final MetricsService metricsService; - public QueueSender() { - Injector injector = App.INJECTOR; + @Inject + public QueueSender( Injector injector ) { actorSystemManager = injector.getInstance( ActorSystemManager.class ); transferLogSerialization = injector.getInstance( TransferLogSerialization.class ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java index 20603a5..92d0785 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java @@ -23,6 +23,9 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; import akka.routing.FromConfig; +import com.google.inject.Inject; +import com.google.inject.Injector; +import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest; @@ -34,10 +37,11 @@ public class QueueSenderRouter extends UntypedActor { private final ActorRef router; - public QueueSenderRouter() { + @Inject + public QueueSenderRouter( Injector injector ) { - router = getContext().actorOf( - FromConfig.getInstance().props(Props.create(QueueSender.class )), "router"); + this.router = getContext().actorOf( FromConfig.getInstance().props( + Props.create( GuiceActorProducer.class, injector, QueueSender.class )), "router"); } @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java index b47aac6..9b11277 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java @@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import akka.actor.UntypedActor; import com.codahale.metrics.Timer; +import com.google.inject.Inject; import com.google.inject.Injector; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.qakka.App; @@ -49,8 +50,6 @@ import java.util.concurrent.atomic.AtomicLong; public class QueueTimeouter extends UntypedActor { private static final Logger logger = LoggerFactory.getLogger( QueueTimeouter.class ); - private final String queueName; - private final QueueMessageSerialization messageSerialization; private final MetricsService metricsService; private final ActorSystemFig actorSystemFig; @@ -62,10 +61,8 @@ public class QueueTimeouter extends UntypedActor { private final AtomicLong totalTimedout = new AtomicLong(0); - public QueueTimeouter(String queueName ) { - this.queueName = queueName; - - Injector injector = App.INJECTOR; + @Inject + public QueueTimeouter( Injector injector) { messageSerialization = injector.getInstance( QueueMessageSerialization.class ); actorSystemFig = injector.getInstance( ActorSystemFig.class ); @@ -88,10 +85,7 @@ public class QueueTimeouter extends UntypedActor { QueueTimeoutRequest request = (QueueTimeoutRequest) message; - if (!request.getQueueName().equals( queueName )) { - throw new QakkaRuntimeException( - "QueueTimeouter for " + queueName + ": Incorrect queueName " + request.getQueueName() ); - } + String queueName = request.getQueueName(); //logger.debug("Processing timeouts for queue {} ", queueName ); @@ -171,12 +165,12 @@ public class QueueTimeouter extends UntypedActor { format.format( (double) t.getSnapshot().getMax() / nano ) ); } -// if (count > 0) { -// logger.debug( "Timed out {} messages for queue {}", count, queueName ); -// -// messageCounterSerialization.decrementCounter( -// queueName, DatabaseQueueMessage.Type.DEFAULT, count); -// } + if (count > 0) { + logger.debug( "Timed out {} messages for queue {}", count, queueName ); + + messageCounterSerialization.decrementCounter( + queueName, DatabaseQueueMessage.Type.DEFAULT, count); + } } finally { timer.close(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java index e54d916..273f0b2 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java @@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import akka.actor.UntypedActor; import com.codahale.metrics.Timer; +import com.google.inject.Inject; import com.google.inject.Injector; import org.apache.usergrid.persistence.qakka.App; import org.apache.usergrid.persistence.qakka.MetricsService; @@ -54,9 +55,8 @@ public class QueueWriter extends UntypedActor { private final MessageCounterSerialization messageCounterSerialization; - public QueueWriter() { - - Injector injector = App.INJECTOR; + @Inject + public QueueWriter( Injector injector ) { messageSerialization = injector.getInstance( QueueMessageSerialization.class ); transferLogSerialization = injector.getInstance( TransferLogSerialization.class ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java index 9cf06d9..f0540af 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java @@ -23,6 +23,9 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; import akka.routing.FromConfig; +import com.google.inject.Inject; +import com.google.inject.Injector; +import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest; @@ -33,11 +36,11 @@ public class QueueWriterRouter extends UntypedActor { private final ActorRef router; + @Inject + public QueueWriterRouter( Injector injector ) { - public QueueWriterRouter() { - - router = getContext().actorOf( - FromConfig.getInstance().props(Props.create(QueueWriter.class )), "router"); + this.router = getContext().actorOf( FromConfig.getInstance().props( + Props.create( GuiceActorProducer.class, injector, QueueWriter.class )), "router"); } @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java index 46e4906..65c3370 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java @@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import akka.actor.UntypedActor; import com.codahale.metrics.Timer; import com.datastax.driver.core.utils.UUIDs; +import com.google.inject.Inject; import com.google.inject.Injector; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.qakka.App; @@ -49,8 +50,6 @@ import java.util.UUID; public class ShardAllocator extends UntypedActor { private static final Logger logger = LoggerFactory.getLogger( ShardAllocator.class ); - private final String queueName; - private final QakkaFig qakkaFig; private final ActorSystemFig actorSystemFig; private final ShardSerialization shardSerialization; @@ -59,10 +58,8 @@ public class ShardAllocator extends UntypedActor { private final CassandraClient cassandraClient; - public ShardAllocator( String queueName ) { - this.queueName = queueName; - - Injector injector = App.INJECTOR; + @Inject + public ShardAllocator( Injector injector ) { this.qakkaFig = injector.getInstance( QakkaFig.class ); this.shardCounterSerialization = injector.getInstance( ShardCounterSerializationImpl.class ); @@ -70,8 +67,6 @@ public class ShardAllocator extends UntypedActor { this.actorSystemFig = injector.getInstance( ActorSystemFig.class ); this.metricsService = injector.getInstance( MetricsService.class ); this.cassandraClient = injector.getInstance( CassandraClientImpl.class ); - - logger.debug( "Created shard allocator for queue {}", queueName ); } @@ -82,14 +77,9 @@ public class ShardAllocator extends UntypedActor { ShardCheckRequest request = (ShardCheckRequest) message; - if (!request.getQueueName().equals( queueName )) { - throw new QakkaRuntimeException( - "ShardAllocator for " + queueName + ": Incorrect queueName " + request.getQueueName() ); - } - // check both types of shard - checkLatestShard( Shard.Type.DEFAULT ); - checkLatestShard( Shard.Type.INFLIGHT ); + checkLatestShard( request.getQueueName(), Shard.Type.DEFAULT ); + checkLatestShard( request.getQueueName(), Shard.Type.INFLIGHT ); } else { unhandled( message ); @@ -97,7 +87,7 @@ public class ShardAllocator extends UntypedActor { } - private void checkLatestShard( Shard.Type type ) { + private void checkLatestShard( String queueName, Shard.Type type ) { Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ALLOCATE_TIME).time(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java index bcb6b79..e24bdb4 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.core.QueueManager; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; import org.apache.usergrid.persistence.qakka.distributed.messages.*; +import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; @@ -134,9 +135,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { String queueName, String sourceRegion, String destRegion, UUID messageId, Long deliveryTime, Long expirationTime ) { - List<String> queueNames = queueManager.getListOfQueues(); - if ( !queueNames.contains( queueName ) ) { - throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist"); + if ( queueManager.getQueueConfig( queueName ) == null ) { + throw new NotFoundException( "Queue not found: " + queueName ); } int maxRetries = qakkaFig.getMaxSendRetries(); @@ -213,9 +213,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { public Collection<DatabaseQueueMessage> getNextMessagesInternal( String queueName, int count ) { - List<String> queueNames = queueManager.getListOfQueues(); - if ( !queueNames.contains( queueName ) ) { - throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist"); + if ( queueManager.getQueueConfig( queueName ) == null ) { + throw new NotFoundException( "Queue not found: " + queueName ); } if ( actorSystemManager.getClientActor() == null || !actorSystemManager.isReady() ) { @@ -280,9 +279,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { @Override public Status ackMessage(String queueName, UUID queueMessageId ) { - List<String> queueNames = queueManager.getListOfQueues(); - if ( !queueNames.contains( queueName ) ) { - return Status.BAD_REQUEST; + if ( queueManager.getQueueConfig( queueName ) == null ) { + throw new NotFoundException( "Queue not found: " + queueName ); } QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId ); @@ -293,9 +291,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { @Override public Status requeueMessage(String queueName, UUID messageId) { - List<String> queueNames = queueManager.getListOfQueues(); - if ( !queueNames.contains( queueName ) ) { - throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist"); + if ( queueManager.getQueueConfig( queueName ) == null ) { + throw new NotFoundException( "Queue not found: " + queueName ); } QueueAckRequest message = new QueueAckRequest( queueName, messageId ); @@ -306,9 +303,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { @Override public Status clearMessages(String queueName) { - List<String> queueNames = queueManager.getListOfQueues(); - if ( !queueNames.contains( queueName ) ) { - throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist"); + if ( queueManager.getQueueConfig( queueName ) == null ) { + throw new NotFoundException( "Queue not found: " + queueName ); } // TODO: implement clear queue http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java index c10d1f5..8ce9822 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java @@ -70,9 +70,9 @@ public class QueueMessageManagerTest extends AbstractTest { @Test public void testBasicOperation() throws Exception { - Injector injector = getInjector(); + String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric(15); - CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); + Injector injector = getInjector(); DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class ); @@ -82,54 +82,60 @@ public class QueueMessageManagerTest extends AbstractTest { app.start( "localhost", getNextAkkaPort(), region ); // create queue and send one message to it - String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric(15); QueueManager queueManager = injector.getInstance( QueueManager.class ); - QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class ); - queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null )); - String jsonData = "{}"; - qmm.sendMessages( queueName, Collections.singletonList(region), null, null, - "application/json", DataType.serializeValue( jsonData, ProtocolVersion.NEWEST_SUPPORTED) ); - distributedQueueService.refresh(); - Thread.sleep(1000); + try { - // get message from the queue - List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 ); - Assert.assertEquals( 1, messages.size() ); - QueueMessage message = messages.get(0); + QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class ); + queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) ); + String jsonData = "{}"; + qmm.sendMessages( queueName, Collections.singletonList( region ), null, null, + "application/json", DataType.serializeValue( jsonData, ProtocolVersion.NEWEST_SUPPORTED ) ); - // test that queue message data is present and correct - QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class ); - DatabaseQueueMessageBody data = qms.loadMessageData( message.getMessageId() ); - Assert.assertNotNull( data ); - Assert.assertEquals( "application/json", data.getContentType() ); - String jsonDataReturned = new String( data.getBlob().array(), Charset.forName("UTF-8") ); - Assert.assertEquals( jsonData, jsonDataReturned ); - - // test that transfer log is empty for our queue - TransferLogSerialization tlogs = injector.getInstance( TransferLogSerialization.class ); - Result<TransferLog> all = tlogs.getAllTransferLogs( null, 1000 ); - List<TransferLog> logs = all.getEntities().stream() + distributedQueueService.refresh(); + Thread.sleep( 1000 ); + + // get message from the queue + List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 ); + Assert.assertEquals( 1, messages.size() ); + QueueMessage message = messages.get( 0 ); + + // test that queue message data is present and correct + QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class ); + DatabaseQueueMessageBody data = qms.loadMessageData( message.getMessageId() ); + Assert.assertNotNull( data ); + Assert.assertEquals( "application/json", data.getContentType() ); + String jsonDataReturned = new String( data.getBlob().array(), Charset.forName( "UTF-8" ) ); + Assert.assertEquals( jsonData, jsonDataReturned ); + + // test that transfer log is empty for our queue + TransferLogSerialization tlogs = injector.getInstance( TransferLogSerialization.class ); + Result<TransferLog> all = tlogs.getAllTransferLogs( null, 1000 ); + List<TransferLog> logs = all.getEntities().stream() .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() ); - Assert.assertTrue( logs.isEmpty() ); + Assert.assertTrue( logs.isEmpty() ); - // ack the message - qmm.ackMessage( queueName, message.getQueueMessageId() ); + // ack the message + qmm.ackMessage( queueName, message.getQueueMessageId() ); - // test that message is no longer stored in non-replicated keyspace + // test that message is no longer stored in non-replicated keyspace - Assert.assertNull( qms.loadMessage( queueName, region, null, - DatabaseQueueMessage.Type.DEFAULT, message.getQueueMessageId() )); + Assert.assertNull( qms.loadMessage( queueName, region, null, + DatabaseQueueMessage.Type.DEFAULT, message.getQueueMessageId() ) ); - Assert.assertNull( qms.loadMessage( queueName, region, null, - DatabaseQueueMessage.Type.INFLIGHT, message.getQueueMessageId() )); + Assert.assertNull( qms.loadMessage( queueName, region, null, + DatabaseQueueMessage.Type.INFLIGHT, message.getQueueMessageId() ) ); - // test that audit log entry was written - AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); - Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() ); - Assert.assertEquals( 3, auditLogs.getEntities().size() ); + // test that audit log entry was written + AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); + Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() ); + Assert.assertEquals( 3, auditLogs.getEntities().size() ); - distributedQueueService.shutdown(); + distributedQueueService.shutdown(); + + } finally { + queueManager.deleteQueue( queueName ); + } } @@ -138,8 +144,6 @@ public class QueueMessageManagerTest extends AbstractTest { Injector injector = getInjector(); - CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); QakkaFig qakkaFig = injector.getInstance( QakkaFig.class ); ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class ); @@ -152,74 +156,82 @@ public class QueueMessageManagerTest extends AbstractTest { // create some number of queue messages QueueManager queueManager = injector.getInstance( QueueManager.class ); - QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class ); - String queueName = "queue_testQueueMessageTimeouts_" + RandomStringUtils.randomAlphanumeric(15); - queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null )); - int numMessages = 40; + String queueName = "queue_testQueueMessageTimeouts_" + RandomStringUtils.randomAlphanumeric( 15 ); - for ( int i=0; i<numMessages; i++ ) { - qmm.sendMessages( + try { + + QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class ); + queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) ); + + int numMessages = 40; + + for (int i = 0; i < numMessages; i++) { + qmm.sendMessages( queueName, Collections.singletonList( region ), null, // delay null, // expiration "application/json", DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) ); - } + } - int maxRetries = 15; - int retries = 0; - while ( retries++ < maxRetries ) { - distributedQueueService.refresh(); - if (inMemoryQueue.size( queueName ) == 40) { - break; + int maxRetries = 15; + int retries = 0; + while (retries++ < maxRetries) { + distributedQueueService.refresh(); + if (inMemoryQueue.size( queueName ) == 40) { + break; + } + Thread.sleep( 500 ); } - Thread.sleep( 500 ); - } - Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) ); + Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) ); - // get all messages from queue + // get all messages from queue - List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages ); - Assert.assertEquals( numMessages, messages.size() ); + List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages ); + Assert.assertEquals( numMessages, messages.size() ); - // ack half of the messages + // ack half of the messages - List<QueueMessage> remove = new ArrayList<>(); + List<QueueMessage> remove = new ArrayList<>(); - for ( int i=0; i<numMessages/2; i++ ) { - QueueMessage queueMessage = messages.get( i ); - qmm.ackMessage( queueName, queueMessage.getQueueMessageId() ); - remove.add( queueMessage ); - } + for (int i = 0; i < numMessages / 2; i++) { + QueueMessage queueMessage = messages.get( i ); + qmm.ackMessage( queueName, queueMessage.getQueueMessageId() ); + remove.add( queueMessage ); + } - for ( QueueMessage message : remove ) { - messages.remove( message ); - } + for (QueueMessage message : remove) { + messages.remove( message ); + } - // wait for twice timeout period + // wait for twice timeout period - Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds()*1000 ); + Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds() * 1000 ); - distributedQueueService.processTimeouts(); + distributedQueueService.processTimeouts(); - Thread.sleep( qakkaFig.getQueueTimeoutSeconds()*1000 ); + Thread.sleep( qakkaFig.getQueueTimeoutSeconds() * 1000 ); - // attempt to ack other half of messages + // attempt to ack other half of messages - for ( QueueMessage message : messages ) { - try { - qmm.ackMessage( queueName, message.getQueueMessageId() ); - Assert.fail("Message should have timed out by now"); + for (QueueMessage message : messages) { + try { + qmm.ackMessage( queueName, message.getQueueMessageId() ); + Assert.fail( "Message should have timed out by now" ); - } catch ( QakkaRuntimeException expected ) { - // keep on going... + } catch (QakkaRuntimeException expected) { + // keep on going... + } } - } - distributedQueueService.shutdown(); + distributedQueueService.shutdown(); + + } finally { + queueManager.deleteQueue( queueName ); + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java index 7423424..53f9224 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java @@ -78,35 +78,42 @@ public class QueueActorServiceTest extends AbstractTest { QueueManager queueManager = injector.getInstance( QueueManager.class ); queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null )); - // send 1 queue message, get back one queue message - UUID messageId = UUIDGen.getTimeUUID(); + try { - final String data = "my test data"; - final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( + // send 1 queue message, get back one queue message + UUID messageId = UUIDGen.getTimeUUID(); + + final String data = "my test data"; + final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" ); - serialization.writeMessageData( messageId, messageBody ); + serialization.writeMessageData( messageId, messageBody ); - distributedQueueService.sendMessageToRegion( - queueName, region, region, messageId, null, null); + distributedQueueService.sendMessageToRegion( + queueName, region, region, messageId, null, null ); - distributedQueueService.refresh(); - Thread.sleep(1000); + distributedQueueService.refresh(); + Thread.sleep( 1000 ); - Collection<DatabaseQueueMessage> qmReturned = distributedQueueService.getNextMessages( queueName, 1 ); - Assert.assertEquals( 1, qmReturned.size() ); + Collection<DatabaseQueueMessage> qmReturned = distributedQueueService.getNextMessages( queueName, 1 ); + Assert.assertEquals( 1, qmReturned.size() ); - DatabaseQueueMessage dqm = qmReturned.iterator().next(); - DatabaseQueueMessageBody dqmb = serialization.loadMessageData( dqm.getMessageId() ); - ByteBuffer blob = dqmb.getBlob(); + DatabaseQueueMessage dqm = qmReturned.iterator().next(); + DatabaseQueueMessageBody dqmb = serialization.loadMessageData( dqm.getMessageId() ); + ByteBuffer blob = dqmb.getBlob(); - String returnedData = new String( blob.array(), "UTF-8"); + String returnedData = new String( blob.array(), "UTF-8" ); // ByteArrayInputStream bais = new ByteArrayInputStream( blob.array() ); // ObjectInputStream ios = new ObjectInputStream( bais ); // String returnedData = (String)ios.readObject(); - Assert.assertEquals( data, returnedData ); + Assert.assertEquals( data, returnedData ); + + distributedQueueService.shutdown(); + + } finally { + queueManager.deleteQueue( queueName ); + } - distributedQueueService.shutdown(); } @@ -128,51 +135,58 @@ public class QueueActorServiceTest extends AbstractTest { String queueName = "queue_testGetMultipleQueueMessages_" + UUID.randomUUID(); QueueManager queueManager = injector.getInstance( QueueManager.class ); - queueManager.createQueue( - new Queue( queueName, "test-type", region, region, 0L, 5, 10, null )); - for ( int i=0; i<100; i++ ) { + try { - UUID messageId = UUIDGen.getTimeUUID(); + queueManager.createQueue( + new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) ); - final String data = "my test data"; - final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( + for (int i = 0; i < 100; i++) { + + UUID messageId = UUIDGen.getTimeUUID(); + + final String data = "my test data"; + final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" ); - serialization.writeMessageData( messageId, messageBody ); + serialization.writeMessageData( messageId, messageBody ); - xferLogSerialization.recordTransferLog( - queueName, actorSystemFig.getRegionLocal(), region, messageId ); + xferLogSerialization.recordTransferLog( + queueName, actorSystemFig.getRegionLocal(), region, messageId ); - distributedQueueService.sendMessageToRegion( - queueName, region, region, messageId , null, null); - } + distributedQueueService.sendMessageToRegion( + queueName, region, region, messageId, null, null ); + } - int maxRetries = 25; - int retries = 0; - int count = 0; - while ( retries++ < maxRetries ) { - distributedQueueService.refresh(); - if (inMemoryQueue.size( queueName ) == 100) { - count = 100; - break; + int maxRetries = 25; + int retries = 0; + int count = 0; + while (retries++ < maxRetries) { + distributedQueueService.refresh(); + if (inMemoryQueue.size( queueName ) == 100) { + count = 100; + break; + } + Thread.sleep( 1000 ); } - Thread.sleep(1000); - } - Assert.assertEquals( 100, count ); + Assert.assertEquals( 100, count ); - Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 75, inMemoryQueue.size( queueName ) ); + Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); + Assert.assertEquals( 75, inMemoryQueue.size( queueName ) ); - Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 50, inMemoryQueue.size( queueName ) ); + Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); + Assert.assertEquals( 50, inMemoryQueue.size( queueName ) ); - Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 25, inMemoryQueue.size( queueName ) ); + Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); + Assert.assertEquals( 25, inMemoryQueue.size( queueName ) ); - Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 0, inMemoryQueue.size( queueName ) ); + Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); + Assert.assertEquals( 0, inMemoryQueue.size( queueName ) ); - distributedQueueService.shutdown(); + distributedQueueService.shutdown(); + + } finally { + queueManager.deleteQueue( queueName ); + } } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java index 3bf352f..791650e 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java @@ -53,7 +53,6 @@ public class QueueActorHelperTest extends AbstractTest { public void loadDatabaseQueueMessage() throws Exception { Injector injector = getInjector(); - CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); injector.getInstance( App.class ); // init the INJECTOR @@ -66,33 +65,39 @@ public class QueueActorHelperTest extends AbstractTest { app.start( "localhost", getNextAkkaPort(), region ); String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); - queueManager.createQueue( new Queue( queueName ) ); - UUID queueMessageId = QakkaUtils.getTimeUuid(); + try { + queueManager.createQueue( new Queue( queueName ) ); - // write message + UUID queueMessageId = QakkaUtils.getTimeUuid(); - DatabaseQueueMessage message = new DatabaseQueueMessage( - QakkaUtils.getTimeUuid(), - DatabaseQueueMessage.Type.DEFAULT, - queueName, - actorSystemFig.getRegionLocal(), - null, - System.currentTimeMillis(), - null, - queueMessageId); - qms.writeMessage( message ); + // write message + + DatabaseQueueMessage message = new DatabaseQueueMessage( + QakkaUtils.getTimeUuid(), + DatabaseQueueMessage.Type.DEFAULT, + queueName, + actorSystemFig.getRegionLocal(), + null, + System.currentTimeMillis(), + null, + queueMessageId); + qms.writeMessage( message ); - // load message + // load message - QueueActorHelper helper = injector.getInstance( QueueActorHelper.class ); - DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage( - queueName, message.getQueueMessageId(), message.getType() ); + QueueActorHelper helper = injector.getInstance( QueueActorHelper.class ); + DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage( + queueName, message.getQueueMessageId(), message.getType() ); - Assert.assertNotNull( queueMessage ); + Assert.assertNotNull( queueMessage ); - DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); - distributedQueueService.shutdown(); + DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); + distributedQueueService.shutdown(); + + } finally { + queueManager.deleteQueue( queueName ); + } } @@ -100,8 +105,6 @@ public class QueueActorHelperTest extends AbstractTest { public void loadDatabaseQueueMessageNotFound() throws Exception { Injector injector = getInjector(); - CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - injector.getInstance( App.class ); // init the INJECTOR QueueManager queueManager = injector.getInstance( QueueManager.class ); @@ -112,20 +115,27 @@ public class QueueActorHelperTest extends AbstractTest { app.start( "localhost", getNextAkkaPort(), region ); String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + queueManager.createQueue( new Queue( queueName ) ); - // don't write any message + try { + + // don't write any message - // load message + // load message - QueueActorHelper helper = injector.getInstance( QueueActorHelper.class ); - DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage( + QueueActorHelper helper = injector.getInstance( QueueActorHelper.class ); + DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage( queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT ); - Assert.assertNull( queueMessage ); + Assert.assertNull( queueMessage ); - DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); - distributedQueueService.shutdown(); + DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); + distributedQueueService.shutdown(); + + } finally { + queueManager.deleteQueue( queueName ); + } } @@ -133,8 +143,6 @@ public class QueueActorHelperTest extends AbstractTest { public void putInflight() throws Exception { Injector injector = getInjector(); - CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - injector.getInstance( App.class ); // init the INJECTOR @@ -153,7 +161,9 @@ public class QueueActorHelperTest extends AbstractTest { String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); queueManager.createQueue( new Queue( queueName ) ); - DatabaseQueueMessage message = new DatabaseQueueMessage( + try { + + DatabaseQueueMessage message = new DatabaseQueueMessage( QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT, queueName, @@ -161,42 +171,46 @@ public class QueueActorHelperTest extends AbstractTest { null, System.currentTimeMillis(), null, - queueMessageId); - qms.writeMessage( message ); + queueMessageId ); + qms.writeMessage( message ); - // put message inflight + // put message inflight - QueueActorHelper helper = injector.getInstance( QueueActorHelper.class ); - helper.putInflight( queueName, message ); + QueueActorHelper helper = injector.getInstance( QueueActorHelper.class ); + helper.putInflight( queueName, message ); - // message must be gone from messages_available table + // message must be gone from messages_available table - Assert.assertNull( qms.loadMessage( + Assert.assertNull( qms.loadMessage( queueName, actorSystemFig.getRegionLocal(), null, DatabaseQueueMessage.Type.DEFAULT, message.getQueueMessageId() ) ); - // message must be present in messages_inflight table + // message must be present in messages_inflight table - Assert.assertNotNull( qms.loadMessage( + Assert.assertNotNull( qms.loadMessage( queueName, actorSystemFig.getRegionLocal(), null, DatabaseQueueMessage.Type.INFLIGHT, message.getQueueMessageId() ) ); - // there must be an audit log record of the successful get operation + // there must be an audit log record of the successful get operation + + AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); + Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() ); + Assert.assertEquals( 1, auditLogs.getEntities().size() ); + Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get( 0 ).getStatus() ); + Assert.assertEquals( AuditLog.Action.GET, auditLogs.getEntities().get( 0 ).getAction() ); - AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); - Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() ); - Assert.assertEquals( 1, auditLogs.getEntities().size() ); - Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get(0).getStatus() ); - Assert.assertEquals( AuditLog.Action.GET, auditLogs.getEntities().get(0).getAction() ); + DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); + distributedQueueService.shutdown(); - DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); - distributedQueueService.shutdown(); + } finally { + queueManager.deleteQueue( queueName ); + } } @@ -222,9 +236,11 @@ public class QueueActorHelperTest extends AbstractTest { String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); queueManager.createQueue( new Queue( queueName ) ); - // write message to messages_inflight table + try { - DatabaseQueueMessage message = new DatabaseQueueMessage( + // write message to messages_inflight table + + DatabaseQueueMessage message = new DatabaseQueueMessage( QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.INFLIGHT, queueName, @@ -232,34 +248,38 @@ public class QueueActorHelperTest extends AbstractTest { null, System.currentTimeMillis(), null, - queueMessageId); - qms.writeMessage( message ); + queueMessageId ); + qms.writeMessage( message ); + + // ack message - // ack message + QueueActorHelper helper = injector.getInstance( QueueActorHelper.class ); + helper.ackQueueMessage( queueName, message.getQueueMessageId() ); - QueueActorHelper helper = injector.getInstance( QueueActorHelper.class ); - helper.ackQueueMessage( queueName, message.getQueueMessageId() ); + // message must be gone from messages_available table - // message must be gone from messages_available table + Assert.assertNull( helper.loadDatabaseQueueMessage( + queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.INFLIGHT ) ); - Assert.assertNull( helper.loadDatabaseQueueMessage( - queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.INFLIGHT )); + // message must be gone from messages_inflight table - // message must be gone from messages_inflight table + Assert.assertNull( helper.loadDatabaseQueueMessage( + queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT ) ); - Assert.assertNull( helper.loadDatabaseQueueMessage( - queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT )); + // there should be an audit log record of the successful ack operation - // there should be an audit log record of the successful ack operation + AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); + Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() ); + Assert.assertEquals( 1, auditLogs.getEntities().size() ); + Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get( 0 ).getStatus() ); + Assert.assertEquals( AuditLog.Action.ACK, auditLogs.getEntities().get( 0 ).getAction() ); - AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); - Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() ); - Assert.assertEquals( 1, auditLogs.getEntities().size() ); - Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get(0).getStatus() ); - Assert.assertEquals( AuditLog.Action.ACK, auditLogs.getEntities().get(0).getAction() ); + DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); + distributedQueueService.shutdown(); - DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); - distributedQueueService.shutdown(); + } finally { + queueManager.deleteQueue( queueName ); + } } @@ -267,8 +287,6 @@ public class QueueActorHelperTest extends AbstractTest { public void ackQueueMessageNotFound() throws Exception { Injector injector = getInjector(); - CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - injector.getInstance( App.class ); // init the INJECTOR QueueManager queueManager = injector.getInstance( QueueManager.class ); @@ -281,17 +299,23 @@ public class QueueActorHelperTest extends AbstractTest { String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); queueManager.createQueue( new Queue( queueName ) ); - // don't write message, just make up some bogus IDs + try { - UUID queueMessageId = QakkaUtils.getTimeUuid(); + // don't write message, just make up some bogus IDs + + UUID queueMessageId = QakkaUtils.getTimeUuid(); + + // ack message must fail - // ack message must fail + QueueActorHelper helper = injector.getInstance( QueueActorHelper.class ); + Assert.assertEquals( DistributedQueueService.Status.NOT_INFLIGHT, + helper.ackQueueMessage( queueName, queueMessageId ) ); - QueueActorHelper helper = injector.getInstance( QueueActorHelper.class ); - Assert.assertEquals( DistributedQueueService.Status.BAD_REQUEST, - helper.ackQueueMessage( queueName, queueMessageId )); + DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); + distributedQueueService.shutdown(); - DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); - distributedQueueService.shutdown(); + } finally { + queueManager.deleteQueue( queueName ); + } } }