Fixes to get tests running consistently.

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/781b894a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/781b894a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/781b894a

Branch: refs/heads/master
Commit: 781b894a65851e4e8a4ff2fac84b644e5b72c12d
Parents: 5a19ba9
Author: Dave Johnson <snoopd...@apache.org>
Authored: Mon Oct 3 10:45:22 2016 -0400
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Mon Oct 3 10:45:22 2016 -0400

----------------------------------------------------------------------
 .../qakka/core/impl/QueueManagerImpl.java       | 36 +++++++++++++-------
 .../qakka/distributed/actors/QueueActor.java    |  2 +-
 .../queue/impl/QakkaQueueManager.java           |  8 ++---
 .../qakka/api/QueueResourceTest.java            | 12 +++++++
 .../qakka/core/QueueMessageManagerTest.java     |  2 +-
 .../distributed/QueueActorServiceTest.java      |  1 +
 .../distributed/actors/ShardAllocatorTest.java  |  2 +-
 .../queue/LegacyQueueManagerTest.java           |  9 ++---
 8 files changed, 43 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
index 789edd4..a8139a1 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.persistence.qakka.core.impl;
 
 import com.google.inject.Inject;
+import com.google.inject.spi.Message;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
@@ -27,6 +28,8 @@ import org.apache.usergrid.persistence.qakka.core.Queue;
 import org.apache.usergrid.persistence.qakka.core.QueueManager;
 import org.apache.usergrid.persistence.qakka.core.Regions;
 import 
org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import 
org.apache.usergrid.persistence.qakka.serialization.queues.DatabaseQueue;
 import 
org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
@@ -37,23 +40,26 @@ import java.util.List;
 
 
 public class QueueManagerImpl implements QueueManager {
-    private final ActorSystemFig          actorSystemFig;
-    private final QueueSerialization queueSerialization;
-    private final DistributedQueueService distributedQueueService;
-    private final ShardSerialization      shardSerialization;
+    private final ActorSystemFig              actorSystemFig;
+    private final QueueSerialization          queueSerialization;
+    private final DistributedQueueService     distributedQueueService;
+    private final ShardSerialization          shardSerialization;
+    private final MessageCounterSerialization messageCounterSerialization;
 
 
     @Inject
     public QueueManagerImpl(
-            ActorSystemFig          actorSystemFig,
-            QueueSerialization      queueSerialization,
-            DistributedQueueService distributedQueueService,
-            ShardSerialization      shardSerialization ) {
-
-        this.actorSystemFig          = actorSystemFig;
-        this.queueSerialization      = queueSerialization;
-        this.distributedQueueService = distributedQueueService;
-        this.shardSerialization      = shardSerialization;
+        ActorSystemFig              actorSystemFig,
+        QueueSerialization          queueSerialization,
+        DistributedQueueService     distributedQueueService,
+        ShardSerialization          shardSerialization,
+        MessageCounterSerialization messageCounterSerialization) {
+
+        this.actorSystemFig              = actorSystemFig;
+        this.queueSerialization          = queueSerialization;
+        this.distributedQueueService     = distributedQueueService;
+        this.shardSerialization          = shardSerialization;
+        this.messageCounterSerialization = messageCounterSerialization;
     }
 
     @Override
@@ -87,6 +93,10 @@ public class QueueManagerImpl implements QueueManager {
         // only write the existence of a queue to the database if its 
dependent initial shards have been written
         queueSerialization.writeQueue(queue.toDatabaseQueue());
 
+        // init counters
+        messageCounterSerialization.incrementCounter( queue.getName(), 
DatabaseQueueMessage.Type.DEFAULT, 0L );
+        messageCounterSerialization.incrementCounter( queue.getName(), 
DatabaseQueueMessage.Type.INFLIGHT, 0L );
+
         distributedQueueService.initQueue( queue.getName() );
         distributedQueueService.refreshQueue( queue.getName() );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 5ebba3d..c706f7d 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -190,7 +190,7 @@ public class QueueActor extends UntypedActor {
 
                 while (queueMessages.size() < 
queueGetRequest.getNumRequested()) {
 
-                    DatabaseQueueMessage queueMessage = inMemoryQueue.peek( 
queueGetRequest.getQueueName() );
+                    DatabaseQueueMessage queueMessage = inMemoryQueue.poll( 
queueGetRequest.getQueueName() );
 
                     if (queueMessage != null) {
                         if (queueActorHelper.putInflight( 
queueGetRequest.getQueueName(), queueMessage )) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index f3cae86..4d81a64 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -60,6 +60,8 @@ public class QakkaQueueManager implements LegacyQueueManager {
         this.queueManager = queueManager;
         this.queueMessageManager = queueMessageManager;
         this.regions = regions;
+
+        createQueueIfNecessary();
     }
 
 
@@ -79,8 +81,6 @@ public class QakkaQueueManager implements LegacyQueueManager {
     @Override
     public <T extends Serializable> void sendMessage(T body) throws 
IOException {
 
-        createQueueIfNecessary();
-
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         ObjectOutputStream oos = new ObjectOutputStream(bos);
         oos.writeObject(body);
@@ -107,8 +107,6 @@ public class QakkaQueueManager implements 
LegacyQueueManager {
     @Override
     public List<LegacyQueueMessage> getMessages(int limit, Class klass) {
 
-        createQueueIfNecessary();
-
         List<LegacyQueueMessage> messages = new ArrayList<>();
         List<QueueMessage> qakkaMessages = 
queueMessageManager.getNextMessages( scope.getName(), limit );
 
@@ -149,8 +147,6 @@ public class QakkaQueueManager implements 
LegacyQueueManager {
     @Override
     public void commitMessage(LegacyQueueMessage queueMessage) {
 
-        createQueueIfNecessary();
-
         UUID queueMessageId  = UUID.fromString( queueMessage.getMessageId() );
         queueMessageManager.ackMessage( scope.getName(), queueMessageId );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
index fcb4212..c05281c 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
@@ -76,6 +76,9 @@ public class QueueResourceTest extends AbstractRestTest {
         Assert.assertFalse( apiResponse.getQueues().isEmpty() );
         Assert.assertEquals( 1, apiResponse.getQueues().size() );
         Assert.assertEquals( queueName, 
apiResponse.getQueues().iterator().next().getName() );
+
+        response = target("queues").path( queueName ).queryParam( "confirm", 
true ).request().delete();
+        Assert.assertEquals( 200, response.getStatus() );
     }
 
 
@@ -182,6 +185,9 @@ public class QueueResourceTest extends AbstractRestTest {
         // get all messages, checking for dups
 
         checkJsonMessages( queueName, numMessages );
+
+        Response response = target( "queues" ).path( queueName ).queryParam( 
"confirm", true ).request().delete();
+        Assert.assertEquals( 200, response.getStatus() );
     }
 
 
@@ -257,6 +263,9 @@ public class QueueResourceTest extends AbstractRestTest {
         // get all messages, checking for dups
 
         checkBinaryMessages( queueName, numMessages );
+
+        Response response = target( "queues" ).path( queueName ).queryParam( 
"confirm", true ).request().delete();
+        Assert.assertEquals( 200, response.getStatus() );
     }
 
 
@@ -373,6 +382,9 @@ public class QueueResourceTest extends AbstractRestTest {
         // and, those same messages should be available again in the queue
 
         checkJsonMessages( queueName, numMessages/2 );
+
+        response = target( "queues" ).path( queueName ).queryParam( "confirm", 
true ).request().delete();
+        Assert.assertEquals( 200, response.getStatus() );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 8ce9822..9e0a9d8 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -180,7 +180,7 @@ public class QueueMessageManagerTest extends AbstractTest {
             int retries = 0;
             while (retries++ < maxRetries) {
                 distributedQueueService.refresh();
-                if (inMemoryQueue.size( queueName ) == 40) {
+                if (qmm.getQueueDepth( queueName ) == 40) {
                     break;
                 }
                 Thread.sleep( 500 );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 53f9224..a5c95bd 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -166,6 +166,7 @@ public class QueueActorServiceTest extends AbstractTest {
                     count = 100;
                     break;
                 }
+                count = inMemoryQueue.size( queueName );
                 Thread.sleep( 1000 );
             }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index b602177..c6831b7 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -203,10 +203,10 @@ public class ShardAllocatorTest extends AbstractTest {
                     null, // expiration
                     "application/json",
                     DataType.serializeValue( "{}", 
ProtocolVersion.NEWEST_SUPPORTED ) );
+                Thread.sleep( 10 );
             }
 
             distributedQueueService.refresh();
-            Thread.sleep( 3000 );
 
             // Test that 8 shards were created
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/781b894a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
index 5800bba..65c3309 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
@@ -57,8 +57,6 @@ public class LegacyQueueManagerTest extends AbstractTest {
 
         Injector myInjector = getInjector();
 
-        CassandraClient cassandraClient = myInjector.getInstance( 
CassandraClientImpl.class );
-
         ActorSystemFig actorSystemFig = myInjector.getInstance( 
ActorSystemFig.class );
         String region = actorSystemFig.getRegionLocal();
 
@@ -83,7 +81,7 @@ public class LegacyQueueManagerTest extends AbstractTest {
         }
 
         messageList = qm.getMessages(1, String.class);
-        assertTrue(messageList.size() <= 0);
+        assertEquals( 0, messageList.size() );
 
         DistributedQueueService distributedQueueService = 
myInjector.getInstance( DistributedQueueService.class );
         distributedQueueService.shutdown();
@@ -125,20 +123,17 @@ public class LegacyQueueManagerTest extends AbstractTest {
         qm.commitMessages(messageList);
 
         messageList = qm.getMessages(1, values.getClass());
-        assertTrue(messageList.size() <= 0);
+        assertEquals( 0, messageList.size());
 
         DistributedQueueService distributedQueueService = 
myInjector.getInstance( DistributedQueueService.class );
         distributedQueueService.shutdown();
     }
 
     @Test
-    @Ignore("Not implemented yet")
     public void queueSize() throws Exception{
 
         Injector myInjector = getInjector();
 
-        CassandraClient cassandraClient = myInjector.getInstance( 
CassandraClientImpl.class );
-
         ActorSystemFig actorSystemFig = myInjector.getInstance( 
ActorSystemFig.class );
         String region = actorSystemFig.getRegionLocal();
 

Reply via email to