Fixes to get tests running consistently.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/781b894a Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/781b894a Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/781b894a Branch: refs/heads/master Commit: 781b894a65851e4e8a4ff2fac84b644e5b72c12d Parents: 5a19ba9 Author: Dave Johnson <snoopd...@apache.org> Authored: Mon Oct 3 10:45:22 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Mon Oct 3 10:45:22 2016 -0400 ---------------------------------------------------------------------- .../qakka/core/impl/QueueManagerImpl.java | 36 +++++++++++++------- .../qakka/distributed/actors/QueueActor.java | 2 +- .../queue/impl/QakkaQueueManager.java | 8 ++--- .../qakka/api/QueueResourceTest.java | 12 +++++++ .../qakka/core/QueueMessageManagerTest.java | 2 +- .../distributed/QueueActorServiceTest.java | 1 + .../distributed/actors/ShardAllocatorTest.java | 2 +- .../queue/LegacyQueueManagerTest.java | 9 ++--- 8 files changed, 43 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java index 789edd4..a8139a1 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java @@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.qakka.core.impl; import com.google.inject.Inject; +import com.google.inject.spi.Message; import org.apache.commons.lang3.StringUtils; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.qakka.core.QakkaUtils; @@ -27,6 +28,8 @@ import org.apache.usergrid.persistence.qakka.core.Queue; import org.apache.usergrid.persistence.qakka.core.QueueManager; import org.apache.usergrid.persistence.qakka.core.Regions; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.apache.usergrid.persistence.qakka.serialization.queues.DatabaseQueue; import org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization; import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; @@ -37,23 +40,26 @@ import java.util.List; public class QueueManagerImpl implements QueueManager { - private final ActorSystemFig actorSystemFig; - private final QueueSerialization queueSerialization; - private final DistributedQueueService distributedQueueService; - private final ShardSerialization shardSerialization; + private final ActorSystemFig actorSystemFig; + private final QueueSerialization queueSerialization; + private final DistributedQueueService distributedQueueService; + private final ShardSerialization shardSerialization; + private final MessageCounterSerialization messageCounterSerialization; @Inject public QueueManagerImpl( - ActorSystemFig actorSystemFig, - QueueSerialization queueSerialization, - DistributedQueueService distributedQueueService, - ShardSerialization shardSerialization ) { - - this.actorSystemFig = actorSystemFig; - this.queueSerialization = queueSerialization; - this.distributedQueueService = distributedQueueService; - this.shardSerialization = shardSerialization; + ActorSystemFig actorSystemFig, + QueueSerialization queueSerialization, + DistributedQueueService distributedQueueService, + ShardSerialization shardSerialization, + MessageCounterSerialization messageCounterSerialization) { + + this.actorSystemFig = actorSystemFig; + this.queueSerialization = queueSerialization; + this.distributedQueueService = distributedQueueService; + this.shardSerialization = shardSerialization; + this.messageCounterSerialization = messageCounterSerialization; } @Override @@ -87,6 +93,10 @@ public class QueueManagerImpl implements QueueManager { // only write the existence of a queue to the database if its dependent initial shards have been written queueSerialization.writeQueue(queue.toDatabaseQueue()); + // init counters + messageCounterSerialization.incrementCounter( queue.getName(), DatabaseQueueMessage.Type.DEFAULT, 0L ); + messageCounterSerialization.incrementCounter( queue.getName(), DatabaseQueueMessage.Type.INFLIGHT, 0L ); + distributedQueueService.initQueue( queue.getName() ); distributedQueueService.refreshQueue( queue.getName() ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java index 5ebba3d..c706f7d 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java @@ -190,7 +190,7 @@ public class QueueActor extends UntypedActor { while (queueMessages.size() < queueGetRequest.getNumRequested()) { - DatabaseQueueMessage queueMessage = inMemoryQueue.peek( queueGetRequest.getQueueName() ); + DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueGetRequest.getQueueName() ); if (queueMessage != null) { if (queueActorHelper.putInflight( queueGetRequest.getQueueName(), queueMessage )) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java index f3cae86..4d81a64 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java @@ -60,6 +60,8 @@ public class QakkaQueueManager implements LegacyQueueManager { this.queueManager = queueManager; this.queueMessageManager = queueMessageManager; this.regions = regions; + + createQueueIfNecessary(); } @@ -79,8 +81,6 @@ public class QakkaQueueManager implements LegacyQueueManager { @Override public <T extends Serializable> void sendMessage(T body) throws IOException { - createQueueIfNecessary(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(body); @@ -107,8 +107,6 @@ public class QakkaQueueManager implements LegacyQueueManager { @Override public List<LegacyQueueMessage> getMessages(int limit, Class klass) { - createQueueIfNecessary(); - List<LegacyQueueMessage> messages = new ArrayList<>(); List<QueueMessage> qakkaMessages = queueMessageManager.getNextMessages( scope.getName(), limit ); @@ -149,8 +147,6 @@ public class QakkaQueueManager implements LegacyQueueManager { @Override public void commitMessage(LegacyQueueMessage queueMessage) { - createQueueIfNecessary(); - UUID queueMessageId = UUID.fromString( queueMessage.getMessageId() ); queueMessageManager.ackMessage( scope.getName(), queueMessageId ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java index fcb4212..c05281c 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java @@ -76,6 +76,9 @@ public class QueueResourceTest extends AbstractRestTest { Assert.assertFalse( apiResponse.getQueues().isEmpty() ); Assert.assertEquals( 1, apiResponse.getQueues().size() ); Assert.assertEquals( queueName, apiResponse.getQueues().iterator().next().getName() ); + + response = target("queues").path( queueName ).queryParam( "confirm", true ).request().delete(); + Assert.assertEquals( 200, response.getStatus() ); } @@ -182,6 +185,9 @@ public class QueueResourceTest extends AbstractRestTest { // get all messages, checking for dups checkJsonMessages( queueName, numMessages ); + + Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete(); + Assert.assertEquals( 200, response.getStatus() ); } @@ -257,6 +263,9 @@ public class QueueResourceTest extends AbstractRestTest { // get all messages, checking for dups checkBinaryMessages( queueName, numMessages ); + + Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete(); + Assert.assertEquals( 200, response.getStatus() ); } @@ -373,6 +382,9 @@ public class QueueResourceTest extends AbstractRestTest { // and, those same messages should be available again in the queue checkJsonMessages( queueName, numMessages/2 ); + + response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete(); + Assert.assertEquals( 200, response.getStatus() ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java index 8ce9822..9e0a9d8 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java @@ -180,7 +180,7 @@ public class QueueMessageManagerTest extends AbstractTest { int retries = 0; while (retries++ < maxRetries) { distributedQueueService.refresh(); - if (inMemoryQueue.size( queueName ) == 40) { + if (qmm.getQueueDepth( queueName ) == 40) { break; } Thread.sleep( 500 ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java index 53f9224..a5c95bd 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java @@ -166,6 +166,7 @@ public class QueueActorServiceTest extends AbstractTest { count = 100; break; } + count = inMemoryQueue.size( queueName ); Thread.sleep( 1000 ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java index b602177..c6831b7 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java @@ -203,10 +203,10 @@ public class ShardAllocatorTest extends AbstractTest { null, // expiration "application/json", DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) ); + Thread.sleep( 10 ); } distributedQueueService.refresh(); - Thread.sleep( 3000 ); // Test that 8 shards were created http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java index 5800bba..65c3309 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java @@ -57,8 +57,6 @@ public class LegacyQueueManagerTest extends AbstractTest { Injector myInjector = getInjector(); - CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class ); - ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class ); String region = actorSystemFig.getRegionLocal(); @@ -83,7 +81,7 @@ public class LegacyQueueManagerTest extends AbstractTest { } messageList = qm.getMessages(1, String.class); - assertTrue(messageList.size() <= 0); + assertEquals( 0, messageList.size() ); DistributedQueueService distributedQueueService = myInjector.getInstance( DistributedQueueService.class ); distributedQueueService.shutdown(); @@ -125,20 +123,17 @@ public class LegacyQueueManagerTest extends AbstractTest { qm.commitMessages(messageList); messageList = qm.getMessages(1, values.getClass()); - assertTrue(messageList.size() <= 0); + assertEquals( 0, messageList.size()); DistributedQueueService distributedQueueService = myInjector.getInstance( DistributedQueueService.class ); distributedQueueService.shutdown(); } @Test - @Ignore("Not implemented yet") public void queueSize() throws Exception{ Injector myInjector = getInjector(); - CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class ); - ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class ); String region = actorSystemFig.getRegionLocal();