Add timeout to shard and message counters, in addition to existing "number of 
changes before write" logic.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/521047c1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/521047c1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/521047c1

Branch: refs/heads/master
Commit: 521047c19504c3b1722dd27a2f3b808c612b4a34
Parents: 1d5e407
Author: Dave Johnson <snoopd...@apache.org>
Authored: Thu Nov 3 17:50:08 2016 -0400
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Thu Nov 3 17:50:08 2016 -0400

----------------------------------------------------------------------
 .../usergrid/persistence/qakka/QakkaFig.java    | 17 +++++++++
 .../impl/MessageCounterSerializationImpl.java   | 37 ++++++++++++++------
 .../impl/ShardCounterSerializationImpl.java     | 22 ++++++++++++
 3 files changed, 66 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/521047c1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 87b5c83..105c193 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -53,8 +53,10 @@ public interface QakkaFig extends GuicyFig, Serializable {
     String QUEUE_GET_TIMEOUT                      = 
"queue.get.timeout.seconds";
 
     String QUEUE_SHARD_COUNTER_MAX_IN_MEMORY      = 
"queue.shard.counter.max-in-memory";
+    String QUEUE_SHARD_COUNTER_WRITE_TIMEOUT      = 
"queue.shard.counter.write-timeout";
 
     String QUEUE_MESSAGE_COUNTER_MAX_IN_MEMORY    = 
"queue.message.counter.max-in-memory";
+    String QUEUE_MESSAGE_COUNTER_WRITE_TIMEOUT    = 
"queue.message.counter.write-timeout";
 
     String QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY = 
"queue.shard.allocation.check.frequency.millis";
 
@@ -66,6 +68,9 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     String QUEUE_MAX_TTL                          = "queue.max.ttl";
 
+    String QUEUE_IN_MEMORY                        = "queue.in-memory.cache";
+
+
 
     /** True if Qakka is running standlone */
     @Key(QUEUE_STANDALONE)
@@ -127,11 +132,19 @@ public interface QakkaFig extends GuicyFig, Serializable {
     @Default("100")
     long getShardCounterMaxInMemory();
 
+    @Key(QUEUE_SHARD_COUNTER_WRITE_TIMEOUT)
+    @Default("5000")
+    long getShardCounterWriteTimeoutMillis();
+
     /** Once counter reaches this value, write it to permanent storage */
     @Key(QUEUE_MESSAGE_COUNTER_MAX_IN_MEMORY)
     @Default("100")
     long getMessageCounterMaxInMemory();
 
+    @Key(QUEUE_MESSAGE_COUNTER_WRITE_TIMEOUT)
+    @Default("5000")
+    long getMessageCounterWriteTimeoutMillis();
+
     /** How often to check whether new shard is needed for each queue */
     @Key(QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY)
     @Default("5000")
@@ -155,4 +168,8 @@ public interface QakkaFig extends GuicyFig, Serializable {
     @Key(QUEUE_MAX_TTL)
     @Default("1209600") // default is two weeks
     int getMaxTtlSeconds();
+
+    @Key(QUEUE_IN_MEMORY)
+    @Default("false")
+    boolean getInMemoryCache();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/521047c1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index 5c66f43..769f584 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -35,6 +35,7 @@ import 
org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardCounterSerializationImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +55,7 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
 
     private final CassandraClient cassandraClient;
     private final CassandraConfig cassandraConfig;
+    private final long writeTimeout;
 
     final static String TABLE_MESSAGE_COUNTERS = "message_counters";
     final static String COLUMN_QUEUE_NAME      = "queue_name";
@@ -82,17 +84,18 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
         final AtomicLong totalInMemoryCount = new AtomicLong( 0L ); // for 
testing using only in-memory counter
         final AtomicLong increment = new AtomicLong( 0L );
         final AtomicLong decrement = new AtomicLong( 0L );
+        long lastWritten = 0L;
 
         InMemoryCount( long baseCount ) {
             this.baseCount = baseCount;
         }
         public void increment( long inc ) {
-            increment.addAndGet( inc );
-            totalInMemoryCount.addAndGet( inc );
+            this.increment.addAndGet( inc );
+            this.totalInMemoryCount.addAndGet( inc );
         }
         public void decrement( long dec ) {
-            decrement.addAndGet( dec );
-            totalInMemoryCount.addAndGet( -dec );
+            this.decrement.addAndGet( dec );
+            this.totalInMemoryCount.addAndGet( -dec );
         }
         public long getIncrement() {
             return increment.get();
@@ -101,8 +104,11 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
             return decrement.get();
         }
         public void clearDeltas() {
-            increment.set( 0L );
-            decrement.set( 0L );
+            this.increment.set( 0L );
+            this.decrement.set( 0L );
+        }
+        boolean needsUpdate() {
+            return System.currentTimeMillis() - lastWritten > writeTimeout;
         }
         void reset() {
             this.baseCount = 0;
@@ -110,13 +116,12 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
             this.decrement.set( 0L );
         }
         public long value() {
-
             // return totalInMemoryCount.get(); // for testing using just 
in-memory counter:
-
             return baseCount + increment.get() - decrement.get();
         }
         void setBaseCount( long baseCount ) {
             this.baseCount = baseCount;
+            this.lastWritten = System.currentTimeMillis();
         }
     }
 
@@ -130,6 +135,7 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
         this.cassandraConfig = cassandraConfig;
         this.maxChangesBeforeSave = qakkaFig.getMessageCounterMaxInMemory();
         this.cassandraClient = cassandraClient;
+        this.writeTimeout = qakkaFig.getShardCounterWriteTimeoutMillis();
     }
 
 
@@ -227,8 +233,19 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
             }
         }
 
-        long value = inMemoryCounters.get( key ).value();
-        return value;
+        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+
+        synchronized ( inMemoryCount ) {
+
+            if ( inMemoryCount.needsUpdate() ) {
+                long totalIncrement = inMemoryCount.getIncrement();
+                incrementCounterInStorage( queueName, type, totalIncrement );
+                inMemoryCount.setBaseCount( retrieveCounterFromStorage( 
queueName, type ) );
+                inMemoryCount.clearDeltas();
+            }
+        }
+
+        return inMemoryCounters.get( key ).value();
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/521047c1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
index 8bb262b..16e31a1 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
@@ -52,6 +52,7 @@ public class ShardCounterSerializationImpl implements 
ShardCounterSerialization
 
     private final CassandraClient cassandraClient;
     private final CassandraConfig cassandraConfig;
+    private final long writeTimeout;
 
     final static String TABLE_COUNTERS       = "shard_counters";
     final static String COLUMN_QUEUE_NAME    = "queue_name";
@@ -72,7 +73,9 @@ public class ShardCounterSerializationImpl implements 
ShardCounterSerialization
 
     class InMemoryCount {
         long baseCount;
+        long lastWritten = 0L;
         final AtomicLong increment = new AtomicLong( 0L );
+
         InMemoryCount( long baseCount ) {
             this.baseCount = baseCount;
         }
@@ -84,10 +87,15 @@ public class ShardCounterSerializationImpl implements 
ShardCounterSerialization
         }
         void setBaseCount( long baseCount ) {
             this.baseCount = baseCount;
+            this.lastWritten = System.currentTimeMillis();
+        }
+        boolean needsUpdate() {
+            return System.currentTimeMillis() - lastWritten > writeTimeout;
         }
         void reset() {
             this.baseCount = 0;
             this.increment.set( 0L );
+            this.lastWritten = System.currentTimeMillis();
         }
     }
 
@@ -97,9 +105,11 @@ public class ShardCounterSerializationImpl implements 
ShardCounterSerialization
     @Inject
     public ShardCounterSerializationImpl(
         CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient 
cassandraClient ) {
+
         this.cassandraConfig = cassandraConfig;
         this.maxInMemoryIncrement = qakkaFig.getShardCounterMaxInMemory();
         this.cassandraClient = cassandraClient;
+        this.writeTimeout = qakkaFig.getShardCounterWriteTimeoutMillis();
     }
 
 
@@ -160,6 +170,18 @@ public class ShardCounterSerializationImpl implements 
ShardCounterSerialization
             }
         }
 
+        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+
+        synchronized ( inMemoryCount ) {
+
+            if ( inMemoryCount.needsUpdate() ) {
+                long totalIncrement = inMemoryCount.getIncrement().get();
+                incrementCounterInStorage( queueName, type, shardId, 
totalIncrement );
+                inMemoryCount.setBaseCount( retrieveCounterFromStorage( 
queueName, type, shardId ) );
+                inMemoryCount.getIncrement().set( 0L );
+            }
+        }
+
         return inMemoryCounters.get( key ).value();
     }
 

Reply via email to