Another counter concurrency fix, plus test stabilization changes

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/041109fb
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/041109fb
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/041109fb

Branch: refs/heads/master
Commit: 041109fb16287d1e6194f29658d7445c8058fc90
Parents: 9f2863f
Author: Dave Johnson <snoopd...@apache.org>
Authored: Tue Oct 11 08:55:12 2016 -0400
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Tue Oct 11 08:55:12 2016 -0400

----------------------------------------------------------------------
 .../impl/MessageCounterSerializationImpl.java   | 81 ++++++++++----------
 .../distributed/QueueActorServiceTest.java      | 17 ++--
 .../queue/src/test/resources/qakka.properties   |  3 +
 3 files changed, 53 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/041109fb/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index 2eb482a..ee4bab2 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -123,25 +123,29 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
 
         synchronized ( inMemoryCounters ) {
 
-            if ( inMemoryCounters.get( key ) == null ) {
+            if (inMemoryCounters.get( key ) == null) {
 
                 Long value = retrieveCounterFromStorage( queueName, type );
 
-                if ( value == null ) {
+                if (value == null) {
                     incrementCounterInStorage( queueName, type, 0L );
-                    inMemoryCounters.put( key, new InMemoryCount( 0L ));
+                    inMemoryCounters.put( key, new InMemoryCount( 0L ) );
                 } else {
-                    inMemoryCounters.put( key, new InMemoryCount( value ));
+                    inMemoryCounters.put( key, new InMemoryCount( value ) );
                 }
             }
+        }
+
+        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
 
-            InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+        synchronized ( inMemoryCount ) {
             inMemoryCount.getIncrement().addAndGet( increment );
 
-//            logger.info("Incremented Count for queue {} type {} = {}",
-//                queueName, type, getCounterValue( queueName, type ));
+            //logger.info("Incremented Count for queue {} type {} = {}",
+            //queueName, type, getCounterValue( queueName, type ));
+
+            saveIfNeeded( queueName, type );
         }
-        saveIfNeeded( queueName, type );
     }
 
 
@@ -152,25 +156,30 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
 
         synchronized ( inMemoryCounters ) {
 
-            if ( inMemoryCounters.get( key ) == null ) {
+            if (inMemoryCounters.get( key ) == null) {
 
                 Long value = retrieveCounterFromStorage( queueName, type );
 
-                if ( value == null ) {
+                if (value == null) {
                     decrementCounterInStorage( queueName, type, 0L );
-                    inMemoryCounters.put( key, new InMemoryCount( 0L ));
+                    inMemoryCounters.put( key, new InMemoryCount( 0L ) );
                 } else {
-                    inMemoryCounters.put( key, new InMemoryCount( value ));
+                    inMemoryCounters.put( key, new InMemoryCount( value ) );
                 }
             }
+        }
+
+        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+
+        synchronized ( inMemoryCount ) {
 
-            InMemoryCount inMemoryCount = inMemoryCounters.get( key );
             inMemoryCount.getDecrement().addAndGet( decrement );
 
-//            logger.info("Decremented Count for queue {} type {} = {}",
-//                queueName, type, getCounterValue( queueName, type ));
+            //logger.info("Decremented Count for queue {} type {} = {}",
+                //queueName, type, getCounterValue( queueName, type ));
+
+            saveIfNeeded( queueName, type );
         }
-        saveIfNeeded( queueName, type );
     }
 
 
@@ -179,19 +188,16 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
 
         String key = buildKey( queueName, type );
 
-        synchronized ( inMemoryCounters ) {
+        if ( inMemoryCounters.get( key ) == null ) {
 
-            if ( inMemoryCounters.get( key ) == null ) {
+            Long value = retrieveCounterFromStorage( queueName, type );
 
-                Long value = retrieveCounterFromStorage( queueName, type );
-
-                if ( value == null ) {
-                    throw new NotFoundException(
-                            MessageFormat.format( "No counter found for queue 
{0} type {1}",
-                                    queueName, type ));
-                } else {
-                    inMemoryCounters.put( key, new InMemoryCount( value ));
-                }
+            if ( value == null ) {
+                throw new NotFoundException(
+                        MessageFormat.format( "No counter found for queue {0} 
type {1}",
+                                queueName, type ));
+            } else {
+                inMemoryCounters.put( key, new InMemoryCount( value ));
             }
         }
 
@@ -245,22 +251,19 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
 
         InMemoryCount inMemoryCount = inMemoryCounters.get( key );
 
-        synchronized ( inMemoryCount ) {
-
-            if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {
+        if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {
 
-                long totalIncrement = inMemoryCount.getIncrement().get();
-                incrementCounterInStorage( queueName, type, totalIncrement );
+            long totalIncrement = inMemoryCount.getIncrement().get();
+            incrementCounterInStorage( queueName, type, totalIncrement );
 
-                long totalDecrement = inMemoryCount.getDecrement().get();
-                decrementCounterInStorage( queueName, type, totalDecrement );
+            long totalDecrement = inMemoryCount.getDecrement().get();
+            decrementCounterInStorage( queueName, type, totalDecrement );
 
-                inMemoryCount.setBaseCount( retrieveCounterFromStorage( 
queueName, type ) );
-                inMemoryCount.getIncrement().set( 0L );
-                inMemoryCount.getDecrement().set( 0L );
+            inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, 
type ) );
+            inMemoryCount.getIncrement().set( 0L );
+            inMemoryCount.getDecrement().set( 0L );
 
-                numChanges.set( 0 );
-            }
+            numChanges.set( 0 );
         }
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/041109fb/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 7fe8b16..f5512e5 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -152,32 +152,31 @@ public class QueueActorServiceTest extends AbstractTest {
                     queueName, region, region, messageId, null, null );
             }
 
-            int maxRetries = 25;
+            int maxRetries = 10;
             int retries = 0;
-            int count = 0;
+            long count = 0;
             while (retries++ < maxRetries) {
                 distributedQueueService.refresh();
-                if ( queueMessageManager.getQueueDepth(  queueName ) == 100 ) {
-                    count = 100;
+                count = queueMessageManager.getQueueDepth(  queueName );
+                if ( count == 100 ) {
                     break;
                 }
-                count = inMemoryQueue.size( queueName );
                 Thread.sleep( 1000 );
             }
 
             Assert.assertEquals( 100, count );
 
             Assert.assertEquals( 25, distributedQueueService.getNextMessages( 
queueName, 25 ).size() );
-            Assert.assertEquals( 75, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 75, queueMessageManager.getQueueDepth(  
queueName ) );
 
             Assert.assertEquals( 25, distributedQueueService.getNextMessages( 
queueName, 25 ).size() );
-            Assert.assertEquals( 50, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 50, queueMessageManager.getQueueDepth(  
queueName ) );
 
             Assert.assertEquals( 25, distributedQueueService.getNextMessages( 
queueName, 25 ).size() );
-            Assert.assertEquals( 25, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 25, queueMessageManager.getQueueDepth(  
queueName ) );
 
             Assert.assertEquals( 25, distributedQueueService.getNextMessages( 
queueName, 25 ).size() );
-            Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 0,  queueMessageManager.getQueueDepth(  
queueName ) );
 
             distributedQueueService.shutdown();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/041109fb/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties 
b/stack/corepersistence/queue/src/test/resources/qakka.properties
index 142138d..95b2509 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -40,6 +40,9 @@ queue.num.actors=50
 queue.sender.num.actors=100
 queue.writer.num.actors=100
 
+queue.send.timeout.seconds=5
+queue.get.timeout.seconds=5
+
 # set shard size and times low for testing purposes
 queue.shard.max.size=10
 queue.shard.allocation.check.frequency.millis=100

Reply via email to