Minor refactoring, renaming and debug logging changes.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f8c3a2dd Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f8c3a2dd Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f8c3a2dd Branch: refs/heads/usergrid-1318-queue Commit: f8c3a2dd87495cd4f65a455f46a187d9c7badd27 Parents: f56e1b0 Author: Dave Johnson <snoopd...@apache.org> Authored: Thu Oct 13 10:12:33 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Thu Oct 13 10:12:33 2016 -0400 ---------------------------------------------------------------------- .../asyncevents/AsyncEventsSchedulerFig.java | 9 +-- .../usergrid/persistence/qakka/QakkaFig.java | 12 ++-- .../distributed/actors/ShardAllocator.java | 5 ++ .../impl/DistributedQueueServiceImpl.java | 6 +- .../impl/MessageCounterSerializationImpl.java | 72 +++++++++++++------- .../impl/QueueMessageSerializationImpl.java | 2 +- .../impl/ShardCounterSerializationImpl.java | 2 +- .../queue/src/test/resources/log4j.properties | 5 +- .../queue/src/test/resources/qakka.properties | 5 +- 9 files changed, 70 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java index f696568..e556870 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java @@ -24,13 +24,9 @@ import org.safehaus.guicyfig.GuicyFig; import org.safehaus.guicyfig.Key; -/** - * - */ @FigSingleton public interface AsyncEventsSchedulerFig extends GuicyFig { - /** * Amount of threads to use in async processing */ @@ -42,25 +38,22 @@ public interface AsyncEventsSchedulerFig extends GuicyFig { */ String IO_SCHEDULER_NAME = "scheduler.io.poolName"; - /** * Amount of threads to use in async processing */ String REPAIR_SCHEDULER_THREADS = "repair.io.threads"; - /** * Name of pool to use when performing scheduling */ String REPAIR_SCHEDULER_NAME = "repair.io.poolName"; - @Default( "40" ) @Key( IO_SCHEDULER_THREADS ) int getMaxIoThreads(); - @Default( "Usergrid-SQS-Pool" ) + @Default( "Usergrid-Queue-Worker-Pool" ) @Key( IO_SCHEDULER_NAME ) String getIoSchedulerName(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java index 3093c39..e6a8667 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java @@ -52,9 +52,9 @@ public interface QakkaFig extends GuicyFig, Serializable { String QUEUE_GET_TIMEOUT = "queue.get.timeout.seconds"; - String QUEUE_MAX_SHARD_COUNTER = "queue.max.inmemory.max.shard.counter"; + String QUEUE_SHARD_COUNTER_MAX_IN_MEMORY = "queue.shard.counter.max-in-memory"; - String QUEUE_MAX_MESSAGE_CHANGES = "queue.max.inmemory.max.message.changes"; + String QUEUE_MESSAGE_COUNTER_MAX_IN_MEMORY = "queue.message.counter.max-in-memory"; String QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY = "queue.shard.allocation.check.frequency.millis"; @@ -123,14 +123,14 @@ public interface QakkaFig extends GuicyFig, Serializable { int getSendTimeoutSeconds(); /** Once counter reaches this value, write it to permanent storage */ - @Key(QUEUE_MAX_SHARD_COUNTER) + @Key(QUEUE_SHARD_COUNTER_MAX_IN_MEMORY) @Default("100") - long getMaxInMemoryShardCounter(); + long getShardCounterMaxInMemory(); /** Once counter reaches this value, write it to permanent storage */ - @Key(QUEUE_MAX_MESSAGE_CHANGES) + @Key(QUEUE_MESSAGE_COUNTER_MAX_IN_MEMORY) @Default("100") - long getMaxInMemoryMessageCounter(); + long getMessageCounterMaxInMemory(); /** How often to check whether new shard is needed for each queue */ @Key(QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY) http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java index 1863472..19059e6 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java @@ -125,6 +125,7 @@ public class ShardAllocator extends UntypedActor { long counterValue = 0; try { counterValue = shardCounterSerialization.getCounterValue( queueName, type, shard.getShardId() ); + } catch ( NotFoundException ignored ) {} if (counterValue > (0.9 * qakkaFig.getMaxShardSize())) { @@ -140,6 +141,10 @@ public class ShardAllocator extends UntypedActor { logger.info("{} Created new shard for queue {} shardId {} timestamp {} counterValue {}", this.hashCode(), queueName, shard.getShardId(), futureUUID.timestamp(), counterValue ); + + } else { +// logger.debug("No new shard for queue {} counterValue {} of max {}", +// queueName, counterValue, qakkaFig.getMaxShardSize() ); } } catch ( Throwable t ) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java index 20bf608..e2c5c2c 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -241,9 +241,9 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { } } - if ( ret.isEmpty() ) { - logger.info( "Requested {} but queue '{}' is empty", count, queueName); - } +// if ( ret.isEmpty() ) { +// logger.info( "Requested {} but queue '{}' is empty", count, queueName); +// } return ret; } finally { http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java index ee4bab2..36175c5 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java @@ -78,19 +78,35 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat class InMemoryCount { long baseCount; + final AtomicLong totalInMemoryCount = new AtomicLong( 0L ); // for testing using only in-memory counter final AtomicLong increment = new AtomicLong( 0L ); final AtomicLong decrement = new AtomicLong( 0L ); InMemoryCount( long baseCount ) { this.baseCount = baseCount; } - public AtomicLong getIncrement() { - return increment; + public void increment( long inc ) { + increment.addAndGet( inc ); + totalInMemoryCount.addAndGet( inc ); } - public AtomicLong getDecrement() { - return decrement; + public void decrement( long dec ) { + decrement.addAndGet( dec ); + totalInMemoryCount.addAndGet( -dec ); + } + public long getIncrement() { + return increment.get(); + } + public long getDecrement() { + return decrement.get(); + } + public void clearDeltas() { + increment.set( 0L ); + decrement.set( 0L ); } public long value() { + + // return totalInMemoryCount.get(); // for testing using just in-memory counter: + return baseCount + increment.get() - decrement.get(); } void setBaseCount( long baseCount ) { @@ -106,7 +122,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) { this.cassandraConfig = cassandraConfig; - this.maxChangesBeforeSave = qakkaFig.getMaxInMemoryMessageCounter(); + this.maxChangesBeforeSave = qakkaFig.getMessageCounterMaxInMemory(); this.cassandraClient = cassandraClient; } @@ -139,13 +155,16 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat InMemoryCount inMemoryCount = inMemoryCounters.get( key ); synchronized ( inMemoryCount ) { - inMemoryCount.getIncrement().addAndGet( increment ); - - //logger.info("Incremented Count for queue {} type {} = {}", - //queueName, type, getCounterValue( queueName, type )); - + inMemoryCount.increment( increment ); saveIfNeeded( queueName, type ); } + + if ( logger.isDebugEnabled() ) { + long value = inMemoryCounters.get( key ).value(); + if (value <= 0) { + logger.debug( "Queue {} type {} decremented count = {}", queueName, type, value ); + } + } } @@ -172,14 +191,16 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat InMemoryCount inMemoryCount = inMemoryCounters.get( key ); synchronized ( inMemoryCount ) { - - inMemoryCount.getDecrement().addAndGet( decrement ); - - //logger.info("Decremented Count for queue {} type {} = {}", - //queueName, type, getCounterValue( queueName, type )); - + inMemoryCount.decrement( decrement ); saveIfNeeded( queueName, type ); } + + if ( logger.isDebugEnabled() ) { + long value = inMemoryCounters.get( key ).value(); + if (value <= 0) { + logger.debug( "Queue {} type {} incremented count = {}", queueName, type, value ); + } + } } @@ -194,14 +215,14 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat if ( value == null ) { throw new NotFoundException( - MessageFormat.format( "No counter found for queue {0} type {1}", - queueName, type )); + MessageFormat.format( "No counter found for queue {0} type {1}", queueName, type )); } else { inMemoryCounters.put( key, new InMemoryCount( value )); } } - return inMemoryCounters.get( key ).value(); + long value = inMemoryCounters.get( key ).value(); + return value; } @@ -253,15 +274,18 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) { - long totalIncrement = inMemoryCount.getIncrement().get(); + long totalIncrement = inMemoryCount.getIncrement(); incrementCounterInStorage( queueName, type, totalIncrement ); - long totalDecrement = inMemoryCount.getDecrement().get(); + long totalDecrement = inMemoryCount.getDecrement(); decrementCounterInStorage( queueName, type, totalDecrement ); - inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) ); - inMemoryCount.getIncrement().set( 0L ); - inMemoryCount.getDecrement().set( 0L ); + long baseCount = retrieveCounterFromStorage( queueName, type ); + + logger.debug("Writing queue counter {} type {} to storage count = {}", queueName, type, baseCount ); + + inMemoryCount.setBaseCount( baseCount ); + inMemoryCount.clearDeltas(); numChanges.set( 0 ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java index 708132c..a174dd0 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java @@ -179,7 +179,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ? Shard.Type.DEFAULT : Shard.Type.INFLIGHT; Shard shard = shardStrategy.selectShard( - queueName, actorSystemFig.getRegionLocal(), shardType, queueMessageId ); + queueName, region, shardType, queueMessageId ); shardId = shard.getShardId(); } else { shardId = shardIdOrNull; http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java index f303f43..276498d 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java @@ -94,7 +94,7 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization public ShardCounterSerializationImpl( CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) { this.cassandraConfig = cassandraConfig; - this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter(); + this.maxInMemoryIncrement = qakkaFig.getShardCounterMaxInMemory(); this.cassandraClient = cassandraClient; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties index c7d53a3..d542096 100644 --- a/stack/corepersistence/queue/src/test/resources/log4j.properties +++ b/stack/corepersistence/queue/src/test/resources/log4j.properties @@ -24,7 +24,6 @@ log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) %c{1} - %m%n log4j.logger.org.apache.cassandra=WARN log4j.logger.org.glassfish=WARN -log4j.logger.org.apache.usergrid=INFO -#log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG -#log4j.logger.org.apache.usergrid.persistence.queue=DEBUG +log4j.logger.org.apache.usergrid.persistence.qakka=INFO +log4j.logger.org.apache.usergrid.persistence.queue=INFO log4j.logger.org.apache.usergrid.corepersistence.asyncevents=INFO http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/test/resources/qakka.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties index 95b2509..464b48d 100644 --- a/stack/corepersistence/queue/src/test/resources/qakka.properties +++ b/stack/corepersistence/queue/src/test/resources/qakka.properties @@ -48,8 +48,9 @@ queue.shard.max.size=10 queue.shard.allocation.check.frequency.millis=100 queue.shard.allocation.advance.time.millis=200 -queue.max.inmemory.shard.counter = 100 -queue.max.inmemory.max.message.changes=3 +# set low for testing purposes +queue.shard.counter.max-in-memory=10 +queue.message.counter.max-in-memory=10 queue.long.polling.time.millis=2000