Add timeout to shard and message counters, in addition to existing "number of changes before write" logic.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/521047c1 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/521047c1 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/521047c1 Branch: refs/heads/master Commit: 521047c19504c3b1722dd27a2f3b808c612b4a34 Parents: 1d5e407 Author: Dave Johnson <snoopd...@apache.org> Authored: Thu Nov 3 17:50:08 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Thu Nov 3 17:50:08 2016 -0400 ---------------------------------------------------------------------- .../usergrid/persistence/qakka/QakkaFig.java | 17 +++++++++ .../impl/MessageCounterSerializationImpl.java | 37 ++++++++++++++------ .../impl/ShardCounterSerializationImpl.java | 22 ++++++++++++ 3 files changed, 66 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/521047c1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java index 87b5c83..105c193 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java @@ -53,8 +53,10 @@ public interface QakkaFig extends GuicyFig, Serializable { String QUEUE_GET_TIMEOUT = "queue.get.timeout.seconds"; String QUEUE_SHARD_COUNTER_MAX_IN_MEMORY = "queue.shard.counter.max-in-memory"; + String QUEUE_SHARD_COUNTER_WRITE_TIMEOUT = "queue.shard.counter.write-timeout"; String QUEUE_MESSAGE_COUNTER_MAX_IN_MEMORY = "queue.message.counter.max-in-memory"; + String QUEUE_MESSAGE_COUNTER_WRITE_TIMEOUT = "queue.message.counter.write-timeout"; String QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY = "queue.shard.allocation.check.frequency.millis"; @@ -66,6 +68,9 @@ public interface QakkaFig extends GuicyFig, Serializable { String QUEUE_MAX_TTL = "queue.max.ttl"; + String QUEUE_IN_MEMORY = "queue.in-memory.cache"; + + /** True if Qakka is running standlone */ @Key(QUEUE_STANDALONE) @@ -127,11 +132,19 @@ public interface QakkaFig extends GuicyFig, Serializable { @Default("100") long getShardCounterMaxInMemory(); + @Key(QUEUE_SHARD_COUNTER_WRITE_TIMEOUT) + @Default("5000") + long getShardCounterWriteTimeoutMillis(); + /** Once counter reaches this value, write it to permanent storage */ @Key(QUEUE_MESSAGE_COUNTER_MAX_IN_MEMORY) @Default("100") long getMessageCounterMaxInMemory(); + @Key(QUEUE_MESSAGE_COUNTER_WRITE_TIMEOUT) + @Default("5000") + long getMessageCounterWriteTimeoutMillis(); + /** How often to check whether new shard is needed for each queue */ @Key(QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY) @Default("5000") @@ -155,4 +168,8 @@ public interface QakkaFig extends GuicyFig, Serializable { @Key(QUEUE_MAX_TTL) @Default("1209600") // default is two weeks int getMaxTtlSeconds(); + + @Key(QUEUE_IN_MEMORY) + @Default("false") + boolean getInMemoryCache(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/521047c1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java index 5c66f43..769f584 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java @@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardCounterSerializationImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +55,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat private final CassandraClient cassandraClient; private final CassandraConfig cassandraConfig; + private final long writeTimeout; final static String TABLE_MESSAGE_COUNTERS = "message_counters"; final static String COLUMN_QUEUE_NAME = "queue_name"; @@ -82,17 +84,18 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat final AtomicLong totalInMemoryCount = new AtomicLong( 0L ); // for testing using only in-memory counter final AtomicLong increment = new AtomicLong( 0L ); final AtomicLong decrement = new AtomicLong( 0L ); + long lastWritten = 0L; InMemoryCount( long baseCount ) { this.baseCount = baseCount; } public void increment( long inc ) { - increment.addAndGet( inc ); - totalInMemoryCount.addAndGet( inc ); + this.increment.addAndGet( inc ); + this.totalInMemoryCount.addAndGet( inc ); } public void decrement( long dec ) { - decrement.addAndGet( dec ); - totalInMemoryCount.addAndGet( -dec ); + this.decrement.addAndGet( dec ); + this.totalInMemoryCount.addAndGet( -dec ); } public long getIncrement() { return increment.get(); @@ -101,8 +104,11 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat return decrement.get(); } public void clearDeltas() { - increment.set( 0L ); - decrement.set( 0L ); + this.increment.set( 0L ); + this.decrement.set( 0L ); + } + boolean needsUpdate() { + return System.currentTimeMillis() - lastWritten > writeTimeout; } void reset() { this.baseCount = 0; @@ -110,13 +116,12 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat this.decrement.set( 0L ); } public long value() { - // return totalInMemoryCount.get(); // for testing using just in-memory counter: - return baseCount + increment.get() - decrement.get(); } void setBaseCount( long baseCount ) { this.baseCount = baseCount; + this.lastWritten = System.currentTimeMillis(); } } @@ -130,6 +135,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat this.cassandraConfig = cassandraConfig; this.maxChangesBeforeSave = qakkaFig.getMessageCounterMaxInMemory(); this.cassandraClient = cassandraClient; + this.writeTimeout = qakkaFig.getShardCounterWriteTimeoutMillis(); } @@ -227,8 +233,19 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat } } - long value = inMemoryCounters.get( key ).value(); - return value; + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + + synchronized ( inMemoryCount ) { + + if ( inMemoryCount.needsUpdate() ) { + long totalIncrement = inMemoryCount.getIncrement(); + incrementCounterInStorage( queueName, type, totalIncrement ); + inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) ); + inMemoryCount.clearDeltas(); + } + } + + return inMemoryCounters.get( key ).value(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/521047c1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java index 8bb262b..16e31a1 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java @@ -52,6 +52,7 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization private final CassandraClient cassandraClient; private final CassandraConfig cassandraConfig; + private final long writeTimeout; final static String TABLE_COUNTERS = "shard_counters"; final static String COLUMN_QUEUE_NAME = "queue_name"; @@ -72,7 +73,9 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization class InMemoryCount { long baseCount; + long lastWritten = 0L; final AtomicLong increment = new AtomicLong( 0L ); + InMemoryCount( long baseCount ) { this.baseCount = baseCount; } @@ -84,10 +87,15 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization } void setBaseCount( long baseCount ) { this.baseCount = baseCount; + this.lastWritten = System.currentTimeMillis(); + } + boolean needsUpdate() { + return System.currentTimeMillis() - lastWritten > writeTimeout; } void reset() { this.baseCount = 0; this.increment.set( 0L ); + this.lastWritten = System.currentTimeMillis(); } } @@ -97,9 +105,11 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization @Inject public ShardCounterSerializationImpl( CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) { + this.cassandraConfig = cassandraConfig; this.maxInMemoryIncrement = qakkaFig.getShardCounterMaxInMemory(); this.cassandraClient = cassandraClient; + this.writeTimeout = qakkaFig.getShardCounterWriteTimeoutMillis(); } @@ -160,6 +170,18 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization } } + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + + synchronized ( inMemoryCount ) { + + if ( inMemoryCount.needsUpdate() ) { + long totalIncrement = inMemoryCount.getIncrement().get(); + incrementCounterInStorage( queueName, type, shardId, totalIncrement ); + inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type, shardId ) ); + inMemoryCount.getIncrement().set( 0L ); + } + } + return inMemoryCounters.get( key ).value(); }