Rename send message methods to sendMessageToAllRegions() and sendMessageToLocalRegion(), and fix implementation of both in Hakka imply.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5a81eb29 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5a81eb29 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5a81eb29 Branch: refs/heads/usergrid-1318-queue Commit: 5a81eb2980b05cf34684c2c47489fed21daf6585 Parents: ae16458 Author: Dave Johnson <snoopd...@apache.org> Authored: Wed Nov 9 10:45:58 2016 -0500 Committer: Dave Johnson <snoopd...@apache.org> Committed: Wed Nov 9 10:45:58 2016 -0500 ---------------------------------------------------------------------- .../asyncevents/AsyncEventServiceImpl.java | 37 +++++++++++++++++--- .../usergrid/persistence/qakka/QakkaFig.java | 16 ++++++--- .../persistence/queue/LegacyQueueManager.java | 4 +-- .../persistence/queue/LocalQueueManager.java | 6 ++-- .../queue/impl/QakkaQueueManager.java | 26 ++++++++------ .../queue/impl/SNSQueueManagerImpl.java | 6 ++-- .../queue/LegacyQueueManagerTest.java | 4 +-- .../impl/ApplicationQueueManagerImpl.java | 2 +- .../services/queues/ImportQueueManager.java | 4 +-- 9 files changed, 71 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 9f931d3..66b527b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -200,7 +200,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { try { //signal to SQS - this.queue.sendMessage( operation ); + this.queue.sendMessageToLocalRegion( operation ); } catch (IOException e) { throw new RuntimeException("Unable to queue message", e); } finally { @@ -215,9 +215,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { try { //signal to SQS if (forUtilityQueue) { - this.utilityQueue.sendMessageToTopic(operation); + this.utilityQueue.sendMessageToAllRegions(operation); } else { - this.queue.sendMessageToTopic(operation); + this.queue.sendMessageToAllRegions(operation); } } catch ( IOException e ) { @@ -461,6 +461,11 @@ public class AsyncEventServiceImpl implements AsyncEventService { public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) { IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy( applicationScope); + + + logger.trace("Offering InitializeApplicationIndexEvent for {}:{}", + applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType()); + offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), false); } @@ -471,7 +476,11 @@ public class AsyncEventServiceImpl implements AsyncEventService { final Entity entity, long updatedAfter) { - offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),new EntityIdScope(applicationScope, entity.getId()), 0)); + logger.trace("Offering EntityIndexEvent for {}:{}", + entity.getId().getUuid(), entity.getId().getType()); + + offer(new EntityIndexEvent(queueFig.getPrimaryRegion(), + new EntityIdScope(applicationScope, entity.getId()), 0)); final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entity.getId(), updatedAfter); @@ -517,6 +526,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { final Entity entity, final Edge newEdge) { + logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}", + newEdge.getType(), entity.getId().getUuid(), entity.getId().getType()); + offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge )); } @@ -549,6 +561,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { public void queueDeleteEdge(final ApplicationScope applicationScope, final Edge edge) { + logger.trace("Offering EdgeDeleteEvent for type {} to target {}:{}", + edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType()); + // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) ); } @@ -610,6 +625,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { //send to the topic so all regions index the batch + logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId ); + offerTopic( elasticsearchIndexEvent, forUtilityQueue ); } @@ -678,6 +695,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { public void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId, UUID markedVersion) { // queue the de-index of old versions to the topic so cleanup happens in all regions + + logger.trace("Offering DeIndexOldVersionsEvent for app {} {}:{}", + applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType()); + offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId), markedVersion), false); @@ -736,6 +757,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { @Override public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) { + logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType()); + // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) ); } @@ -894,7 +917,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { submitToIndex( indexEventResults, isUtilityQueue ); if ( messagesToAck.size() < messages.size() ) { - logger.warn( "Missing {} message(s) from index processing", + logger.warn( + "Missing {} message(s) from index processing", messages.size() - messagesToAck.size() ); } @@ -916,6 +940,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { //do not rethrow so we can process all of them } } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); + //end flatMap }, indexProcessorFig.getEventConcurrencyFactor() ); @@ -983,6 +1008,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { }); + logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size()); + offerBatch( batch, forUtilityQueue ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java index 105c193..c034b92 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java @@ -40,9 +40,13 @@ public interface QakkaFig extends GuicyFig, Serializable { String QUEUE_TIMEOUT_SECONDS = "queue.timeout.seconds"; - String QUEUE_REFRESH_MILLISECONDS = "queue.refresh.millis"; + String QUEUE_REFRESH_MILLISECONDS = "queue.inmemory.refresh.millis"; - String QUEUE_INMEMORY_SIZE = "queue.inmemory.size"; + String QUEUE_IN_MEMORY = "queue.inmemory.cache"; + + String QUEUE_INMEMORY_SIZE = "queue.inmemory.cache.size"; + + String QUEUE_IN_MEMORY_REFRESH_ASYNC = "queue.inmemory.cache.async"; String QUEUE_SEND_MAX_RETRIES = "queue.send.max.retries"; @@ -53,9 +57,11 @@ public interface QakkaFig extends GuicyFig, Serializable { String QUEUE_GET_TIMEOUT = "queue.get.timeout.seconds"; String QUEUE_SHARD_COUNTER_MAX_IN_MEMORY = "queue.shard.counter.max-in-memory"; + String QUEUE_SHARD_COUNTER_WRITE_TIMEOUT = "queue.shard.counter.write-timeout"; String QUEUE_MESSAGE_COUNTER_MAX_IN_MEMORY = "queue.message.counter.max-in-memory"; + String QUEUE_MESSAGE_COUNTER_WRITE_TIMEOUT = "queue.message.counter.write-timeout"; String QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY = "queue.shard.allocation.check.frequency.millis"; @@ -68,8 +74,6 @@ public interface QakkaFig extends GuicyFig, Serializable { String QUEUE_MAX_TTL = "queue.max.ttl"; - String QUEUE_IN_MEMORY = "queue.in-memory.cache"; - /** True if Qakka is running standlone */ @@ -172,4 +176,8 @@ public interface QakkaFig extends GuicyFig, Serializable { @Key(QUEUE_IN_MEMORY) @Default("false") boolean getInMemoryCache(); + + @Key(QUEUE_IN_MEMORY_REFRESH_ASYNC) + @Default("true") + boolean getInMemoryRefreshAsync(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java index e38d3bc..6627148 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java @@ -63,13 +63,13 @@ public interface LegacyQueueManager { * @param body * @throws IOException */ - <T extends Serializable> void sendMessage(T body)throws IOException; + <T extends Serializable> void sendMessageToLocalRegion(T body)throws IOException; /** * Send a messae to the topic to be sent to other queues * @param body */ - <T extends Serializable> void sendMessageToTopic(T body) throws IOException; + <T extends Serializable> void sendMessageToAllRegions(T body) throws IOException; /** * purge messages http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java index 630c2e7..90621c0 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java @@ -85,7 +85,7 @@ public class LocalQueueManager implements LegacyQueueManager { @Override - public <T extends Serializable> void sendMessage( final T body ) throws IOException { + public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException { String uuid = UUID.randomUUID().toString(); try { queue.offer(new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here"),5000,TimeUnit.MILLISECONDS); @@ -97,8 +97,8 @@ public class LocalQueueManager implements LegacyQueueManager { @Override - public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException { - sendMessage( body ); + public <T extends Serializable> void sendMessageToAllRegions(final T body ) throws IOException { + sendMessageToLocalRegion( body ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java index 9d7a341..6900a46 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java @@ -21,12 +21,9 @@ import com.datastax.driver.core.DataType; import com.datastax.driver.core.ProtocolVersion; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; -import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.core.*; import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; -import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; -import org.apache.usergrid.persistence.queue.LegacyQueueFig; import org.apache.usergrid.persistence.queue.LegacyQueueManager; import org.apache.usergrid.persistence.queue.LegacyQueueMessage; import org.apache.usergrid.persistence.queue.LegacyQueueScope; @@ -77,10 +74,7 @@ public class QakkaQueueManager implements LegacyQueueManager { } - @Override - public <T extends Serializable> void sendMessage(T body) throws IOException { - - logger.debug( "Sending message to queue {} region {}", this.scope.getRegionImplementation().name() ); + private <T extends Serializable> void doSendMessage( T body, List<String> regions ) throws IOException { createQueueIfNecessary(); @@ -93,7 +87,7 @@ public class QakkaQueueManager implements LegacyQueueManager { queueMessageManager.sendMessages( scope.getName(), - regions.getRegions( scope.getRegionImplementation().name() ), + regions, null, // delay millis null, // expiration seconds "application/octet-stream", @@ -102,8 +96,18 @@ public class QakkaQueueManager implements LegacyQueueManager { @Override - public <T extends Serializable> void sendMessageToTopic(T body) throws IOException { - sendMessage( body ); + public <T extends Serializable> void sendMessageToLocalRegion(T body) throws IOException { + List<String> regionsList = regions.getRegions( Regions.LOCAL ); + logger.trace( "Sending message to queue {} local region {}", scope.getName(), regionsList ); + doSendMessage( body, regionsList ); + } + + + @Override + public <T extends Serializable> void sendMessageToAllRegions(T body) throws IOException { + List<String> regionsList = regions.getRegions( Regions.ALL ); + logger.trace( "Sending message to queue {} all regions {}", scope.getName(), regionsList ); + doSendMessage( body, regionsList ); } @@ -175,7 +179,7 @@ public class QakkaQueueManager implements LegacyQueueManager { public void sendMessages( List bodies ) throws IOException { for ( Object body : bodies ) { - sendMessage( (Serializable)body ); + sendMessageToLocalRegion( (Serializable)body ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index 637f157..853fcdd 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -539,7 +539,7 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { @Override - public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException { + public <T extends Serializable> void sendMessageToAllRegions(final T body ) throws IOException { if ( snsAsync == null ) { logger.error( "SNS client is null, perhaps it failed to initialize successfully" ); return; @@ -582,13 +582,13 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { } for ( Object body : bodies ) { - sendMessage( ( Serializable ) body ); + sendMessageToLocalRegion( ( Serializable ) body ); } } @Override - public <T extends Serializable> void sendMessage( final T body ) throws IOException { + public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException { if ( sqsAsync == null ) { logger.error( "SQS client is null, perhaps it failed to initialize successfully" ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java index 13fb195..535c30c 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java @@ -25,8 +25,6 @@ import com.google.inject.Injector; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.qakka.AbstractTest; import org.apache.usergrid.persistence.qakka.App; -import org.apache.usergrid.persistence.qakka.core.CassandraClient; -import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl; import org.junit.Test; @@ -68,7 +66,7 @@ public class LegacyQueueManagerTest extends AbstractTest { LegacyQueueManager qm = qmf.getQueueManager(scope); String value = "bodytest"; - qm.sendMessage(value); + qm.sendMessageToLocalRegion(value); Thread.sleep(5000); http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index 96e2dbd..d26cd5f 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -356,7 +356,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { if(logger.isTraceEnabled()) { logger.trace("Queueing notification message for device: {}", message.get().getDeviceId()); } - qm.sendMessage( message.get() ); + qm.sendMessageToLocalRegion( message.get() ); queueMeter.mark(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java index d9db84a..47758a3 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java @@ -65,13 +65,13 @@ public class ImportQueueManager implements LegacyQueueManager { @Override - public <T extends Serializable> void sendMessage( final T body ) throws IOException { + public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException { } @Override - public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException { + public <T extends Serializable> void sendMessageToAllRegions(final T body ) throws IOException { }