Repository: usergrid Updated Branches: refs/heads/usergrid-1318-queue 522a5515b -> 2a8679512
Minor formatting, logging and changes recommended by IntelliJ Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/ae164586 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/ae164586 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/ae164586 Branch: refs/heads/usergrid-1318-queue Commit: ae16458699a3f9d6b8ea9e2f4b851bb87b39558a Parents: 522a551 Author: Dave Johnson <snoopd...@apache.org> Authored: Wed Nov 9 10:43:00 2016 -0500 Committer: Dave Johnson <snoopd...@apache.org> Committed: Wed Nov 9 10:43:00 2016 -0500 ---------------------------------------------------------------------- .../asyncevents/AsyncEventService.java | 11 ++++--- .../actorsystem/ActorSystemManagerImpl.java | 12 ++++---- .../persistence/actorsystem/ClientActor.java | 2 +- .../persistence/core/CassandraConfigImpl.java | 6 ++++ .../distributed/actors/QueueActorRouter.java | 2 +- .../qakka/distributed/actors/QueueSender.java | 4 +-- .../distributed/actors/QueueTimeouter.java | 5 +++- .../qakka/distributed/actors/QueueWriter.java | 7 +++-- .../impl/DistributedQueueServiceImpl.java | 15 +++++++--- .../impl/MessageCounterSerializationImpl.java | 30 ++++++++++---------- 10 files changed, 58 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index ec84a0a..cab4e3e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -43,9 +43,11 @@ public interface AsyncEventService extends ReIndexAction { void queueInitializeApplicationIndex( final ApplicationScope applicationScope ); /** - * Queue an entity to be indexed. This will start processing immediately. For implementations that are realtime (akka, in memory) - * We will return a distributed future. For SQS impls, this will return immediately, and the result will not be available. + * Queue an entity to be indexed. This will start processing immediately. + * For implementations that are realtime (akka, in memory) We will return a distributed future. + * For SQS impls, this will return immediately, and the result will not be available. * After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly. + * * @param applicationScope * @param entity The entity to index. Should be fired when an entity is updated * @param updatedAfter @@ -54,9 +56,10 @@ public interface AsyncEventService extends ReIndexAction { /** - * Fired when a new edge is added to an entity. Such as initial entity creation, adding to a collection, or creating a connection + * Fired when a new edge is added to an entity. Such as initial entity creation, + * adding to a collection, or creating a connection * - * TODO: We shouldn't take an entity here, only the id. It doesn't make sense in a distributed context to pass the entity + * TODO: We shouldn't take an entity here, only the id. It doesn't make sense in a distributed context * * @param applicationScope * @param entity http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java index 96ebe69..c2f96af 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java @@ -396,7 +396,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { /** - * Create RequestActor for each region. + * Create ClientActor for each region. */ private void createClientActors( ActorSystem system ) { @@ -438,7 +438,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { private void waitForClientActor( ActorRef ra ) { - logger.info( "Waiting on RequestActor [{}]...", ra.path() ); + logger.info( "Waiting on ClientActor [{}]...", ra.path() ); started = false; @@ -455,19 +455,19 @@ public class ActorSystemManagerImpl implements ActorSystemManager { started = true; break; } - logger.info( "Waiting for RequestActor [{}] region [{}] for [{}s]", ra.path(), currentRegion, retries ); + logger.info( "Waiting for ClientActor [{}] region [{}] for [{}s]", ra.path(), currentRegion, retries ); Thread.sleep( 1000 ); } catch (Exception e) { - logger.error( "Error: Timeout waiting for RequestActor [{}]", ra.path() ); + logger.error( "Error: Timeout waiting for ClientActor [{}]", ra.path() ); } retries++; } if (started) { - logger.info( "RequestActor [{}] has started", ra.path() ); + logger.info( "ClientActor [{}] has started", ra.path() ); } else { - throw new RuntimeException( "RequestActor ["+ra.path()+"] did not start in time" ); + throw new RuntimeException( "ClientActor ["+ra.path()+"] did not start in time" ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClientActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClientActor.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClientActor.java index c553a31..dba8c2e 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClientActor.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClientActor.java @@ -82,7 +82,7 @@ public class ClientActor extends UntypedActor { ActorSelection service = getContext().actorSelection( address + routerPath ); service.tell( message, getSender() ); - } else if ( routerPath != null && !ready ) { + } else if ( routerPath != null ) { logger.debug("{} responding with status unknown", name); getSender().tell( new ErrorResponse("ClientActor not ready"), getSender() ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java index 1503093..b0ce9e3 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java @@ -27,6 +27,8 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import com.netflix.astyanax.model.ConsistencyLevel; import org.apache.log4j.lf5.viewer.categoryexplorer.CategoryPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -35,6 +37,7 @@ import org.apache.log4j.lf5.viewer.categoryexplorer.CategoryPath; */ @Singleton public class CassandraConfigImpl implements CassandraConfig { + private static final Logger logger = LoggerFactory.getLogger( CassandraConfigImpl.class ); private CassandraFig cassandraFig; @@ -79,6 +82,9 @@ public class CassandraConfigImpl implements CassandraConfig { cassandraFig.getApplicationLocalKeyspace() + "_" + cassandraFig.getLocalDataCenter().replace("-", "_"); + logger.info("Application Keyspace: {}", applicationKeyspace); + logger.info("Application Local Keyspace: {}", applicationLocalKeyspace); + //add the listeners to update the values cassandraFig.addPropertyChangeListener( new PropertyChangeListener() { @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java index 71cf332..1ff8502 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java @@ -114,7 +114,7 @@ public class QueueActorRouter extends UntypedActor { if ( timeoutSchedulersByQueueName.get( queueName ) == null) { Cancellable scheduler = getContext().system().scheduler().schedule( - Duration.create( 0, TimeUnit.MILLISECONDS ), + Duration.create( 0, TimeUnit.SECONDS ), Duration.create( qakkaFig.getQueueTimeoutSeconds() / 2, TimeUnit.SECONDS ), self(), new QueueTimeoutRequest( queueName ), http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java index 461c28f..f584474 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java @@ -129,7 +129,7 @@ public class QueueSender extends UntypedActor { if (actorSystemManager.getCurrentRegion().equals( destRegion )) { - logger.trace("Sending queue {} message to local region {}", queueName, destRegion ); + logger.trace("{}: Sending queue {} message to local region {}", name, queueName, destRegion ); // send to current region via local clientActor ActorRef clientActor = actorSystemManager.getClientActor(); @@ -137,7 +137,7 @@ public class QueueSender extends UntypedActor { } else { - logger.trace("Sending queue {} message to remote region {}", queueName, destRegion ); + logger.trace("{} Sending queue {} message to remote region {}", name, queueName, destRegion ); // send to remote region via cluster client for that region ActorRef clusterClient = actorSystemManager.getClusterClient( destRegion ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java index 58afc76..546e638 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java @@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import akka.actor.UntypedActor; import com.codahale.metrics.Timer; import com.google.inject.Inject; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.qakka.MetricsService; import org.apache.usergrid.persistence.qakka.QakkaFig; @@ -43,6 +44,8 @@ import java.util.UUID; public class QueueTimeouter extends UntypedActor { private static final Logger logger = LoggerFactory.getLogger( QueueTimeouter.class ); + private final String name = RandomStringUtils.randomAlphanumeric( 4 ); + private final QueueMessageSerialization messageSerialization; private final MetricsService metricsService; private final ActorSystemFig actorSystemFig; @@ -105,7 +108,7 @@ public class QueueTimeouter extends UntypedActor { } if (count > 0) { - logger.debug( "Timed out {} messages for queue {}", count, queueName ); + logger.debug( "{}: Timed out {} messages for queue {}", name, count, queueName ); } } finally { http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java index c9be47f..b7f5401 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java @@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import akka.actor.UntypedActor; import com.codahale.metrics.Timer; import com.google.inject.Inject; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.usergrid.persistence.qakka.MetricsService; import org.apache.usergrid.persistence.qakka.core.QakkaUtils; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; @@ -45,6 +46,8 @@ public class QueueWriter extends UntypedActor { public enum WriteStatus { SUCCESS_XFERLOG_DELETED, SUCCESS_XFERLOG_NOTDELETED, ERROR }; + private final String name = RandomStringUtils.randomAlphanumeric( 4 ); + private final QueueMessageSerialization messageSerialization; private final TransferLogSerialization transferLogSerialization; private final AuditLogSerialization auditLogSerialization; @@ -97,8 +100,8 @@ public class QueueWriter extends UntypedActor { messageSerialization.writeMessage( dbqm ); - //logger.debug("Wrote queue message id {} to queue name {}", - // dbqm.getQueueMessageId(), dbqm.getQueueName()); + logger.trace("{}: Wrote queue message id {} to queue name {}", + name, dbqm.getQueueMessageId(), dbqm.getQueueName()); } catch (Throwable t) { logger.debug("Error creating database queue message", t); http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java index 96e8cab..1784dc3 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -47,6 +47,7 @@ import scala.concurrent.Future; import java.lang.reflect.Method; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; @Singleton @@ -269,7 +270,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { logger.debug("ERROR RESPONSE (1) popping queue {}, retrying {}", queueName, tries ); } else { - logger.debug("TIMEOUT popping from queue {}, retrying {}", queueName, tries ); + logger.trace("TIMEOUT popping from queue {}, retrying {}", queueName, tries ); } } else if ( responseObject instanceof ClientActor.ErrorResponse ) { @@ -282,8 +283,11 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { logger.debug("UNKNOWN RESPONSE popping queue {}, retrying {}", queueName, tries ); } + } catch ( TimeoutException e ) { + logger.trace("TIMEOUT popping to queue " + queueName + " retrying " + tries, e ); + } catch ( Exception e ) { - logger.error("ERROR popping to queue " + queueName + " retrying " + tries, e ); + logger.debug("ERROR popping to queue " + queueName + " retrying " + tries, e ); } } @@ -335,12 +339,15 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { return qprm.getStatus(); } else if ( response != null ) { - logger.debug("ERROR RESPONSE sending message, retrying {}", retries ); + logger.debug("UNKNOWN RESPONSE sending message, retrying {}", retries ); } else { - logger.debug("TIMEOUT sending message, retrying {}", retries ); + logger.trace("TIMEOUT sending message, retrying {}", retries ); } + } catch ( TimeoutException e ) { + logger.trace( "TIMEOUT sending message, retrying " + retries, e ); + } catch ( Exception e ) { logger.debug("ERROR sending message, retrying " + retries, e ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/ae164586/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java index 769f584..2e7722d 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java @@ -57,15 +57,15 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat private final CassandraConfig cassandraConfig; private final long writeTimeout; - final static String TABLE_MESSAGE_COUNTERS = "message_counters"; - final static String COLUMN_QUEUE_NAME = "queue_name"; - final static String COLUMN_COUNTER_VALUE = "counter_value"; - final static String COLUMN_MESSAGE_TYPE = "message_type"; + private final static String TABLE_MESSAGE_COUNTERS = "message_counters"; + private final static String COLUMN_QUEUE_NAME = "queue_name"; + private final static String COLUMN_COUNTER_VALUE = "counter_value"; + private final static String COLUMN_MESSAGE_TYPE = "message_type"; // design note: counters based on DataStax example here: // https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html - static final String CQL = + private static final String CQL = "CREATE TABLE IF NOT EXISTS message_counters ( " + "counter_value counter, " + "queue_name varchar, " + @@ -75,11 +75,11 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat /** number of changes since last save to database */ - final AtomicInteger numChanges = new AtomicInteger( 0 ); + private final AtomicInteger numChanges = new AtomicInteger( 0 ); - final long maxChangesBeforeSave; + private final long maxChangesBeforeSave; - class InMemoryCount { + private class InMemoryCount { long baseCount; final AtomicLong totalInMemoryCount = new AtomicLong( 0L ); // for testing using only in-memory counter final AtomicLong increment = new AtomicLong( 0L ); @@ -103,7 +103,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat public long getDecrement() { return decrement.get(); } - public void clearDeltas() { + private void clearDeltas() { this.increment.set( 0L ); this.decrement.set( 0L ); } @@ -125,7 +125,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat } } - private Map<String, InMemoryCount> inMemoryCounters = new ConcurrentHashMap<>(200); + private final Map<String, InMemoryCount> inMemoryCounters = new ConcurrentHashMap<>(200); @Inject @@ -174,7 +174,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat if ( logger.isDebugEnabled() ) { long value = inMemoryCounters.get( key ).value(); if (value <= 0) { - logger.debug( "Queue {} type {} decremented count = {}", queueName, type, value ); + logger.debug( "Queue {} type {} incremented {} count = {}", queueName, type, increment, value ); } } } @@ -200,7 +200,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat } } - InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + final InMemoryCount inMemoryCount = inMemoryCounters.get( key ); synchronized ( inMemoryCount ) { inMemoryCount.decrement( decrement ); @@ -285,7 +285,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat } - void incrementCounterInStorage( String queueName, DatabaseQueueMessage.Type type, long increment ) { + private void incrementCounterInStorage( String queueName, DatabaseQueueMessage.Type type, long increment ) { Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS ) .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) ) @@ -295,7 +295,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat } - void decrementCounterInStorage( String queueName, DatabaseQueueMessage.Type type, long decrement ) { + private void decrementCounterInStorage( String queueName, DatabaseQueueMessage.Type type, long decrement ) { Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS ) .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) ) @@ -305,7 +305,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat } - Long retrieveCounterFromStorage( String queueName, DatabaseQueueMessage.Type type ) { + private Long retrieveCounterFromStorage( String queueName, DatabaseQueueMessage.Type type ) { Statement query = QueryBuilder.select().from( TABLE_MESSAGE_COUNTERS ) .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )