Rename send message methods to sendMessageToAllRegions() and 
sendMessageToLocalRegion(), and fix implementation of both in Hakka imply.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5a81eb29
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5a81eb29
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5a81eb29

Branch: refs/heads/usergrid-1318-queue
Commit: 5a81eb2980b05cf34684c2c47489fed21daf6585
Parents: ae16458
Author: Dave Johnson <snoopd...@apache.org>
Authored: Wed Nov 9 10:45:58 2016 -0500
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Wed Nov 9 10:45:58 2016 -0500

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      | 37 +++++++++++++++++---
 .../usergrid/persistence/qakka/QakkaFig.java    | 16 ++++++---
 .../persistence/queue/LegacyQueueManager.java   |  4 +--
 .../persistence/queue/LocalQueueManager.java    |  6 ++--
 .../queue/impl/QakkaQueueManager.java           | 26 ++++++++------
 .../queue/impl/SNSQueueManagerImpl.java         |  6 ++--
 .../queue/LegacyQueueManagerTest.java           |  4 +--
 .../impl/ApplicationQueueManagerImpl.java       |  2 +-
 .../services/queues/ImportQueueManager.java     |  4 +--
 9 files changed, 71 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 9f931d3..66b527b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -200,7 +200,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
         try {
             //signal to SQS
-            this.queue.sendMessage( operation );
+            this.queue.sendMessageToLocalRegion( operation );
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -215,9 +215,9 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         try {
             //signal to SQS
             if (forUtilityQueue) {
-                this.utilityQueue.sendMessageToTopic(operation);
+                this.utilityQueue.sendMessageToAllRegions(operation);
             } else {
-                this.queue.sendMessageToTopic(operation);
+                this.queue.sendMessageToAllRegions(operation);
             }
         }
         catch ( IOException e ) {
@@ -461,6 +461,11 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
     public void queueInitializeApplicationIndex( final ApplicationScope 
applicationScope) {
         IndexLocationStrategy indexLocationStrategy = 
indexLocationStrategyFactory.getIndexLocationStrategy(
             applicationScope);
+
+
+        logger.trace("Offering InitializeApplicationIndexEvent for {}:{}",
+            applicationScope.getApplication().getUuid(), 
applicationScope.getApplication().getType());
+
         offerTopic( new InitializeApplicationIndexEvent( 
queueFig.getPrimaryRegion(),
             new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), 
false);
     }
@@ -471,7 +476,11 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
                                        final Entity entity, long updatedAfter) 
{
 
 
-        offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),new 
EntityIdScope(applicationScope, entity.getId()), 0));
+        logger.trace("Offering EntityIndexEvent for {}:{}",
+            entity.getId().getUuid(), entity.getId().getType());
+
+        offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),
+            new EntityIdScope(applicationScope, entity.getId()), 0));
 
         final EntityIndexOperation entityIndexOperation =
             new EntityIndexOperation( applicationScope, entity.getId(), 
updatedAfter);
@@ -517,6 +526,9 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
                              final Entity entity,
                              final Edge newEdge) {
 
+        logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
+            newEdge.getType(), entity.getId().getUuid(), 
entity.getId().getType());
+
         offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), 
applicationScope, entity.getId(), newEdge ));
 
     }
@@ -549,6 +561,9 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
     public void queueDeleteEdge(final ApplicationScope applicationScope,
                                 final Edge edge) {
 
+        logger.trace("Offering EdgeDeleteEvent for type {} to target {}:{}",
+            edge.getType(), edge.getTargetNode().getUuid(), 
edge.getTargetNode().getType());
+
         // sent in region (not offerTopic) as the delete IO happens in-region, 
then queues a multi-region de-index op
         offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), 
applicationScope, edge ) );
     }
@@ -610,6 +625,8 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
         //send to the topic so all regions index the batch
 
+        logger.trace("Offering ElasticsearchIndexEvent for message {}", 
newMessageId );
+
         offerTopic( elasticsearchIndexEvent, forUtilityQueue );
     }
 
@@ -678,6 +695,10 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
     public void queueDeIndexOldVersion(final ApplicationScope 
applicationScope, final Id entityId, UUID markedVersion) {
 
         // queue the de-index of old versions to the topic so cleanup happens 
in all regions
+
+        logger.trace("Offering DeIndexOldVersionsEvent for app {} {}:{}",
+            applicationScope.getApplication().getUuid(), entityId.getUuid(), 
entityId.getType());
+
         offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
             new EntityIdScope( applicationScope, entityId), markedVersion), 
false);
 
@@ -736,6 +757,8 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
     @Override
     public void queueEntityDelete(final ApplicationScope applicationScope, 
final Id entityId) {
 
+        logger.trace("Offering EntityDeleteEvent for {}:{}", 
entityId.getUuid(), entityId.getType());
+
         // sent in region (not offerTopic) as the delete IO happens in-region, 
then queues a multi-region de-index op
         offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new 
EntityIdScope( applicationScope, entityId ) ) );
     }
@@ -894,7 +917,8 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
                                                          submitToIndex( 
indexEventResults, isUtilityQueue );
 
                                                      if ( messagesToAck.size() 
< messages.size() ) {
-                                                         logger.warn( "Missing 
{} message(s) from index processing",
+                                                         logger.warn(
+                                                             "Missing {} 
message(s) from index processing",
                                                             messages.size() - 
messagesToAck.size() );
                                                      }
 
@@ -916,6 +940,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
                                                      //do not rethrow so we 
can process all of them
                                                  }
                                              } ).subscribeOn( 
rxTaskScheduler.getAsyncIOScheduler() );
+
                             //end flatMap
                         }, indexProcessorFig.getEventConcurrencyFactor() );
 
@@ -983,6 +1008,8 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
         });
 
+        logger.trace("Offering batch of EntityIndexEvent of size {}", 
batch.size());
+
         offerBatch( batch, forUtilityQueue );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 105c193..c034b92 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -40,9 +40,13 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     String QUEUE_TIMEOUT_SECONDS                  = "queue.timeout.seconds";
 
-    String QUEUE_REFRESH_MILLISECONDS             = "queue.refresh.millis";
+    String QUEUE_REFRESH_MILLISECONDS             = 
"queue.inmemory.refresh.millis";
 
-    String QUEUE_INMEMORY_SIZE                    = "queue.inmemory.size";
+    String QUEUE_IN_MEMORY                        = "queue.inmemory.cache";
+
+    String QUEUE_INMEMORY_SIZE                    = 
"queue.inmemory.cache.size";
+
+    String QUEUE_IN_MEMORY_REFRESH_ASYNC          = 
"queue.inmemory.cache.async";
 
     String QUEUE_SEND_MAX_RETRIES                 = "queue.send.max.retries";
 
@@ -53,9 +57,11 @@ public interface QakkaFig extends GuicyFig, Serializable {
     String QUEUE_GET_TIMEOUT                      = 
"queue.get.timeout.seconds";
 
     String QUEUE_SHARD_COUNTER_MAX_IN_MEMORY      = 
"queue.shard.counter.max-in-memory";
+
     String QUEUE_SHARD_COUNTER_WRITE_TIMEOUT      = 
"queue.shard.counter.write-timeout";
 
     String QUEUE_MESSAGE_COUNTER_MAX_IN_MEMORY    = 
"queue.message.counter.max-in-memory";
+
     String QUEUE_MESSAGE_COUNTER_WRITE_TIMEOUT    = 
"queue.message.counter.write-timeout";
 
     String QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY = 
"queue.shard.allocation.check.frequency.millis";
@@ -68,8 +74,6 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     String QUEUE_MAX_TTL                          = "queue.max.ttl";
 
-    String QUEUE_IN_MEMORY                        = "queue.in-memory.cache";
-
 
 
     /** True if Qakka is running standlone */
@@ -172,4 +176,8 @@ public interface QakkaFig extends GuicyFig, Serializable {
     @Key(QUEUE_IN_MEMORY)
     @Default("false")
     boolean getInMemoryCache();
+
+    @Key(QUEUE_IN_MEMORY_REFRESH_ASYNC)
+    @Default("true")
+    boolean getInMemoryRefreshAsync();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
index e38d3bc..6627148 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
@@ -63,13 +63,13 @@ public interface LegacyQueueManager {
      * @param body
      * @throws IOException
      */
-    <T extends Serializable> void sendMessage(T body)throws IOException;
+    <T extends Serializable> void sendMessageToLocalRegion(T body)throws 
IOException;
 
     /**
      * Send a messae to the topic to be sent to other queues
      * @param body
      */
-    <T extends Serializable> void sendMessageToTopic(T body) throws 
IOException;
+    <T extends Serializable> void sendMessageToAllRegions(T body) throws 
IOException;
 
     /**
      * purge messages

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
index 630c2e7..90621c0 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
@@ -85,7 +85,7 @@ public class LocalQueueManager implements LegacyQueueManager {
 
 
     @Override
-    public <T extends Serializable> void sendMessage( final T body ) throws 
IOException {
+    public <T extends Serializable> void sendMessageToLocalRegion(final T body 
) throws IOException {
         String uuid = UUID.randomUUID().toString();
         try {
             queue.offer(new LegacyQueueMessage(uuid, "handle_" + uuid, body, 
"put type here"),5000,TimeUnit.MILLISECONDS);
@@ -97,8 +97,8 @@ public class LocalQueueManager implements LegacyQueueManager {
 
 
     @Override
-    public <T extends Serializable> void sendMessageToTopic( final T body ) 
throws IOException {
-       sendMessage( body );
+    public <T extends Serializable> void sendMessageToAllRegions(final T body 
) throws IOException {
+       sendMessageToLocalRegion( body );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index 9d7a341..6900a46 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -21,12 +21,9 @@ import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.ProtocolVersion;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
-import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.*;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
-import org.apache.usergrid.persistence.queue.LegacyQueueFig;
 import org.apache.usergrid.persistence.queue.LegacyQueueManager;
 import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
 import org.apache.usergrid.persistence.queue.LegacyQueueScope;
@@ -77,10 +74,7 @@ public class QakkaQueueManager implements LegacyQueueManager 
{
     }
 
 
-    @Override
-    public <T extends Serializable> void sendMessage(T body) throws 
IOException {
-
-        logger.debug( "Sending message to queue {} region {}", 
this.scope.getRegionImplementation().name() );
+    private <T extends Serializable> void doSendMessage( T body, List<String> 
regions ) throws IOException {
 
         createQueueIfNecessary();
 
@@ -93,7 +87,7 @@ public class QakkaQueueManager implements LegacyQueueManager {
 
         queueMessageManager.sendMessages(
             scope.getName(),
-            regions.getRegions( scope.getRegionImplementation().name() ),
+            regions,
             null, // delay millis
             null, // expiration seconds
             "application/octet-stream",
@@ -102,8 +96,18 @@ public class QakkaQueueManager implements 
LegacyQueueManager {
 
 
     @Override
-    public <T extends Serializable> void sendMessageToTopic(T body) throws 
IOException {
-        sendMessage( body );
+    public <T extends Serializable> void sendMessageToLocalRegion(T body) 
throws IOException {
+        List<String> regionsList = regions.getRegions( Regions.LOCAL );
+        logger.trace( "Sending message to queue {} local region {}", 
scope.getName(), regionsList );
+        doSendMessage( body, regionsList );
+    }
+
+
+    @Override
+    public <T extends Serializable> void sendMessageToAllRegions(T body) 
throws IOException {
+        List<String> regionsList = regions.getRegions( Regions.ALL );
+        logger.trace( "Sending message to queue {} all regions {}", 
scope.getName(), regionsList );
+        doSendMessage( body, regionsList );
     }
 
 
@@ -175,7 +179,7 @@ public class QakkaQueueManager implements 
LegacyQueueManager {
     public void sendMessages( List bodies ) throws IOException {
 
         for ( Object body : bodies ) {
-            sendMessage( (Serializable)body );
+            sendMessageToLocalRegion( (Serializable)body );
         }
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 637f157..853fcdd 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -539,7 +539,7 @@ public class SNSQueueManagerImpl implements 
LegacyQueueManager {
 
 
     @Override
-    public <T extends Serializable> void sendMessageToTopic( final T body ) 
throws IOException {
+    public <T extends Serializable> void sendMessageToAllRegions(final T body 
) throws IOException {
         if ( snsAsync == null ) {
             logger.error( "SNS client is null, perhaps it failed to initialize 
successfully" );
             return;
@@ -582,13 +582,13 @@ public class SNSQueueManagerImpl implements 
LegacyQueueManager {
         }
 
         for ( Object body : bodies ) {
-            sendMessage( ( Serializable ) body );
+            sendMessageToLocalRegion( ( Serializable ) body );
         }
     }
 
 
     @Override
-    public <T extends Serializable> void sendMessage( final T body ) throws 
IOException {
+    public <T extends Serializable> void sendMessageToLocalRegion(final T body 
) throws IOException {
 
         if ( sqsAsync == null ) {
             logger.error( "SQS client is null, perhaps it failed to initialize 
successfully" );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
index 13fb195..535c30c 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
@@ -25,8 +25,6 @@ import com.google.inject.Injector;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.qakka.AbstractTest;
 import org.apache.usergrid.persistence.qakka.App;
-import org.apache.usergrid.persistence.qakka.core.CassandraClient;
-import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
 import 
org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
 import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
 import org.junit.Test;
@@ -68,7 +66,7 @@ public class LegacyQueueManagerTest extends AbstractTest {
         LegacyQueueManager qm = qmf.getQueueManager(scope);
 
         String value = "bodytest";
-        qm.sendMessage(value);
+        qm.sendMessageToLocalRegion(value);
 
         Thread.sleep(5000);
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 96e2dbd..d26cd5f 100644
--- 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -356,7 +356,7 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
                             if(logger.isTraceEnabled()) {
                                 logger.trace("Queueing notification message 
for device: {}", message.get().getDeviceId());
                             }
-                            qm.sendMessage( message.get() );
+                            qm.sendMessageToLocalRegion( message.get() );
                             queueMeter.mark();
                         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a81eb29/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
 
b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index d9db84a..47758a3 100644
--- 
a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ 
b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -65,13 +65,13 @@ public class ImportQueueManager implements 
LegacyQueueManager {
 
 
     @Override
-    public <T extends Serializable> void sendMessage( final T body ) throws 
IOException {
+    public <T extends Serializable> void sendMessageToLocalRegion(final T body 
) throws IOException {
 
     }
 
 
     @Override
-    public <T extends Serializable> void sendMessageToTopic( final T body ) 
throws IOException {
+    public <T extends Serializable> void sendMessageToAllRegions(final T body 
) throws IOException {
 
     }
 

Reply via email to