Message counter fixes

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fc68e427
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fc68e427
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fc68e427

Branch: refs/heads/master
Commit: fc68e4270de147d0c37cd1fc31c0407a25c6a711
Parents: 2a86795
Author: Dave Johnson <snoopd...@apache.org>
Authored: Thu Nov 10 10:43:32 2016 -0500
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Thu Nov 10 10:43:32 2016 -0500

----------------------------------------------------------------------
 stack/corepersistence/common/pom.xml            |  4 ++
 .../impl/MessageCounterSerializationImpl.java   | 65 +++++++++----------
 .../distributed/QueueActorServiceTest.java      | 67 +++++++++++++++++++-
 .../distributed/actors/ShardAllocatorTest.java  |  4 +-
 .../DatabaseQueueMessageSerializationTest.java  |  5 +-
 .../queue/src/test/resources/qakka.properties   |  2 +
 .../rest/system/QueueSystemResource.java        |  2 +-
 7 files changed, 105 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/fc68e427/stack/corepersistence/common/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/pom.xml 
b/stack/corepersistence/common/pom.xml
index 63d339b..aacec73 100644
--- a/stack/corepersistence/common/pom.xml
+++ b/stack/corepersistence/common/pom.xml
@@ -80,6 +80,10 @@
                     <artifactId>netty</artifactId>
                     <groupId>io.netty</groupId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>logback-classic</artifactId>
+                    <groupId>ch.qos.logback</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fc68e427/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index 2e7722d..c17c34f 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -86,6 +86,7 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
         final AtomicLong decrement = new AtomicLong( 0L );
         long lastWritten = 0L;
 
+
         InMemoryCount( long baseCount ) {
             this.baseCount = baseCount;
         }
@@ -95,7 +96,7 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
         }
         public void decrement( long dec ) {
             this.decrement.addAndGet( dec );
-            this.totalInMemoryCount.addAndGet( -dec );
+            this.totalInMemoryCount.addAndGet( dec );
         }
         public long getIncrement() {
             return increment.get();
@@ -114,6 +115,7 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
             this.baseCount = 0;
             this.increment.set( 0L );
             this.decrement.set( 0L );
+            this.totalInMemoryCount.set( 0L );
         }
         public long value() {
             // return totalInMemoryCount.get(); // for testing using just 
in-memory counter:
@@ -135,7 +137,7 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
         this.cassandraConfig = cassandraConfig;
         this.maxChangesBeforeSave = qakkaFig.getMessageCounterMaxInMemory();
         this.cassandraClient = cassandraClient;
-        this.writeTimeout = qakkaFig.getShardCounterWriteTimeoutMillis();
+        this.writeTimeout = qakkaFig.getMessageCounterWriteTimeoutMillis();
     }
 
 
@@ -149,32 +151,30 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
 
         String key = buildKey( queueName, type );
 
-        synchronized ( inMemoryCounters ) {
+        synchronized (inMemoryCounters) {
 
             if (inMemoryCounters.get( key ) == null) {
 
-                Long value = retrieveCounterFromStorage( queueName, type );
+                    Long value = retrieveCounterFromStorage( queueName, type );
 
-                if (value == null) {
-                    incrementCounterInStorage( queueName, type, 0L );
-                    inMemoryCounters.put( key, new InMemoryCount( 0L ) );
-                } else {
-                    inMemoryCounters.put( key, new InMemoryCount( value ) );
+                    if (value == null) {
+                        incrementCounterInStorage( queueName, type, 0L );
+                        inMemoryCounters.put( key, new InMemoryCount( 0L ) );
+                    } else {
+                        inMemoryCounters.put( key, new InMemoryCount( value ) 
);
+                    }
                 }
-            }
-        }
 
-        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+            InMemoryCount inMemoryCount = inMemoryCounters.get( key );
 
-        synchronized ( inMemoryCount ) {
             inMemoryCount.increment( increment );
             saveIfNeeded( queueName, type );
         }
 
         if ( logger.isDebugEnabled() ) {
-            long value = inMemoryCounters.get( key ).value();
+            long value = getCounterValue( queueName, type );
             if (value <= 0) {
-                logger.debug( "Queue {} type {} incremented {} count = {}", 
queueName, type, increment, value );
+                logger.debug( "Queue {} type {} incremented {} count {}", 
queueName, type, increment, value );
             }
         }
     }
@@ -198,19 +198,17 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
                     inMemoryCounters.put( key, new InMemoryCount( value ) );
                 }
             }
-        }
 
-        final InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+            final InMemoryCount inMemoryCount = inMemoryCounters.get( key );
 
-        synchronized ( inMemoryCount ) {
             inMemoryCount.decrement( decrement );
             saveIfNeeded( queueName, type );
         }
 
         if ( logger.isDebugEnabled() ) {
-            long value = inMemoryCounters.get( key ).value();
+            long value = getCounterValue( queueName, type );
             if (value <= 0) {
-                logger.debug( "Queue {} type {} incremented count = {}", 
queueName, type, value );
+                logger.debug( "Queue {} type {} decremented {} count {}", 
queueName, type, decrement, value );
             }
         }
     }
@@ -233,17 +231,7 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
             }
         }
 
-        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
-
-        synchronized ( inMemoryCount ) {
-
-            if ( inMemoryCount.needsUpdate() ) {
-                long totalIncrement = inMemoryCount.getIncrement();
-                incrementCounterInStorage( queueName, type, totalIncrement );
-                inMemoryCount.setBaseCount( retrieveCounterFromStorage( 
queueName, type ) );
-                inMemoryCount.clearDeltas();
-            }
-        }
+        saveIfNeeded( queueName, type );
 
         return inMemoryCounters.get( key ).value();
     }
@@ -331,22 +319,27 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
 
         InMemoryCount inMemoryCount = inMemoryCounters.get( key );
 
-        if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {
+        if ( inMemoryCount.needsUpdate() || numChanges.incrementAndGet() > 
maxChangesBeforeSave ) {
 
             long totalIncrement = inMemoryCount.getIncrement();
-            incrementCounterInStorage( queueName, type, totalIncrement );
-
             long totalDecrement = inMemoryCount.getDecrement();
+            long baseCount = retrieveCounterFromStorage( queueName, type );
+
+            logger.debug("Saved counter {} type {} to storage baseCount {} inc 
{} dec {}",
+                queueName, type, baseCount, totalIncrement, totalDecrement );
+
+            incrementCounterInStorage( queueName, type, totalIncrement );
             decrementCounterInStorage( queueName, type, totalDecrement );
 
-            long baseCount = retrieveCounterFromStorage( queueName, type );
+            baseCount = retrieveCounterFromStorage( queueName, type );
 
-            logger.debug("Writing queue counter {} type {} to storage count = 
{}", queueName, type, baseCount );
+            logger.debug("Saved counter {} type {} to storage baseCount {}", 
queueName, type, baseCount );
 
             inMemoryCount.setBaseCount( baseCount );
             inMemoryCount.clearDeltas();
 
             numChanges.set( 0 );
+
         }
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fc68e427/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 053c093..da5d166 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -125,7 +125,6 @@ public class QueueActorServiceTest extends AbstractTest {
         DistributedQueueService distributedQueueService = 
injector.getInstance( DistributedQueueService.class );
         QueueMessageSerialization serialization         = 
injector.getInstance( QueueMessageSerialization.class );
         TransferLogSerialization xferLogSerialization   = 
injector.getInstance( TransferLogSerialization.class );
-        InMemoryQueue inMemoryQueue                     = 
injector.getInstance( InMemoryQueue.class );
         QueueMessageManager queueMessageManager         = 
injector.getInstance( QueueMessageManager.class );
 
         String queueName = "queue_testGetMultipleQueueMessages_" + 
UUID.randomUUID();
@@ -186,4 +185,70 @@ public class QueueActorServiceTest extends AbstractTest {
             queueManager.deleteQueue( queueName );
         }
     }
+
+
+    @Test
+    public void testQueueMessageCounter() throws InterruptedException {
+
+        Injector injector = getInjector();
+
+        ActorSystemFig actorSystemFig = injector.getInstance( 
ActorSystemFig.class );
+        String region = actorSystemFig.getRegionLocal();
+
+        App app = injector.getInstance( App.class );
+        app.start("localhost", getNextAkkaPort(), region);
+
+        DistributedQueueService distributedQueueService = 
injector.getInstance( DistributedQueueService.class );
+        QueueMessageSerialization serialization         = 
injector.getInstance( QueueMessageSerialization.class );
+        TransferLogSerialization xferLogSerialization   = 
injector.getInstance( TransferLogSerialization.class );
+        QueueMessageManager queueMessageManager         = 
injector.getInstance( QueueMessageManager.class );
+
+        String queueName = "queue_testGetMultipleQueueMessages_" + 
UUID.randomUUID();
+        QueueManager queueManager = injector.getInstance( QueueManager.class );
+
+        try {
+
+            queueManager.createQueue(
+                new Queue( queueName, "test-type", region, region, 0L, 5, 10, 
null ) );
+
+            UUID messageId = UUIDGen.getTimeUUID();
+
+            final String data = "my test data";
+            final DatabaseQueueMessageBody messageBody = new 
DatabaseQueueMessageBody(
+                DataType.serializeValue( data, 
ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
+            serialization.writeMessageData( messageId, messageBody );
+
+            xferLogSerialization.recordTransferLog(
+                queueName, actorSystemFig.getRegionLocal(), region, messageId 
);
+
+            distributedQueueService.sendMessageToRegion(
+                queueName, region, region, messageId, null, null );
+
+            DatabaseQueueMessage.Type type = DatabaseQueueMessage.Type.DEFAULT;
+
+            Thread.sleep(5000);
+
+            int maxRetries = 10;
+            int retries = 0;
+            long count = 0;
+            while (retries++ < maxRetries) {
+                distributedQueueService.refresh();
+                count = queueMessageManager.getQueueDepth(  queueName, type );
+                if ( count > 0 ) {
+                    break;
+                }
+                Thread.sleep( 1000 );
+            }
+
+            Thread.sleep( 1000 );
+
+            Assert.assertEquals( 1, queueMessageManager.getQueueDepth( 
queueName, type ) );
+
+            distributedQueueService.shutdown();
+
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fc68e427/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index ecacccc..0ac1537 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -212,8 +212,8 @@ public class ShardAllocatorTest extends AbstractTest {
 
             // Test that approximately right number of shards created
             int shardCount = countShards( cassandraClient, shardCounterSer, 
queueName, region, Shard.Type.DEFAULT );
-            Assert.assertTrue( "shards > 7", shardCount > 7 );
-            Assert.assertTrue( "shards < 17", shardCount < 17 );
+            Assert.assertTrue( shardCount + " is too few shards", shardCount > 
7 );
+            Assert.assertTrue( shardCount + " is too many shards", shardCount 
< 17 );
 
         } finally {
             queueManager.deleteQueue( queueName );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fc68e427/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
index 88d89de..3c09027 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
@@ -24,16 +24,13 @@ import com.datastax.driver.core.ProtocolVersion;
 import com.google.inject.Injector;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.usergrid.persistence.qakka.AbstractTest;
-import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
 import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
 import org.junit.Test;
 
 import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fc68e427/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties 
b/stack/corepersistence/queue/src/test/resources/qakka.properties
index 464b48d..0c8e686 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -36,6 +36,8 @@ usergrid.cluster.seeds=us-east:localhost
 # Port used for cluster communications.
 usergrid.cluster.port=3545
 
+queue.inmemory.cache=false
+
 queue.num.actors=50
 queue.sender.num.actors=100
 queue.writer.num.actors=100

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fc68e427/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
----------------------------------------------------------------------
diff --git 
a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
 
b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
index 16e77d0..6345687 100644
--- 
a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
+++ 
b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
@@ -191,7 +191,7 @@ public class QueueSystemResource extends 
AbstractContextResource {
         @PathParam("queueName") String queueName,
         @QueryParam("callback") @DefaultValue("callback") String callback ) {
 
-        logger.debug("DMJ_TEMP clearQueue");
+        logger.debug("clearQueue");
 
         QueueManager queueManager = injector.getInstance( QueueManager.class );
         QueueMessageManager queueMessageManager = injector.getInstance( 
QueueMessageManagerImpl.class );

Reply via email to