Allow submission to SNS/SQS via sync client

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9a151089
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9a151089
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9a151089

Branch: refs/heads/expose-reindex
Commit: 9a15108924b29997c9ca440b467ab8006a8f8acb
Parents: 173be8a
Author: Peter Johnson <pjohn...@apigee.com>
Authored: Wed Sep 27 08:19:20 2017 -0700
Committer: Peter Johnson <pjohn...@apigee.com>
Committed: Wed Sep 27 08:19:20 2017 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  75 +++++++------
 .../corepersistence/CpRelationManager.java      |  52 ++++-----
 .../asyncevents/AsyncEventService.java          |   6 +-
 .../asyncevents/AsyncEventServiceImpl.java      |  44 +++++---
 .../asyncevents/EventBuilderImpl.java           |   2 +
 .../corepersistence/util/CpCollectionUtils.java | 104 +++++++++++++++++
 .../index/AsyncIndexServiceTest.java            |   2 +-
 .../persistence/queue/LegacyQueueFig.java       |   5 +
 .../persistence/queue/LegacyQueueManager.java   |  11 +-
 .../persistence/queue/LocalQueueManager.java    |  11 +-
 .../queue/impl/QakkaQueueManager.java           |  12 +-
 .../queue/impl/SNSQueueManagerImpl.java         | 112 +++++++++++++++++--
 .../queue/LegacyQueueManagerTest.java           |   2 +-
 .../impl/ApplicationQueueManagerImpl.java       |   2 +-
 .../services/queues/ImportQueueManager.java     |   8 +-
 15 files changed, 341 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 68c4ef0..1dc4a89 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -34,6 +34,7 @@ import 
org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
 import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
 import org.apache.usergrid.corepersistence.service.CollectionService;
 import org.apache.usergrid.corepersistence.service.ConnectionService;
+import org.apache.usergrid.corepersistence.util.CpCollectionUtils;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.mq.QueueManager;
@@ -161,7 +162,6 @@ public class CpEntityManager implements EntityManager {
 
     public QueueManagerFactory queueManagerFactory;
 
-
     //    /** Short-term cache to keep us from reloading same Entity during 
single request. */
 //    private LoadingCache<EntityScope, 
org.apache.usergrid.persistence.model.entity.Entity> entityCache;
 
@@ -521,6 +521,10 @@ public class CpEntityManager implements EntityManager {
 
         cpEntity = CpEntityMapUtils.fromMap( cpEntity, entity.getProperties(), 
entity.getType(), true );
 
+        String entityType = cpEntity.getId().getType();
+        boolean skipIndexingForType = skipIndexingForType(entityType);
+        Boolean asyncIndex = asyncIndexingForType(entityType);
+
         try {
 
             String region = lookupAuthoritativeRegionForType( entity.getType() 
);
@@ -546,38 +550,32 @@ public class CpEntityManager implements EntityManager {
             handleWriteUniqueVerifyException( entity, wuve );
         }
 
-        if ( !skipIndexingForType( cpEntity.getId().getType() ) ) {
+        if (!skipIndexingForType) {
+            indexEntity(cpEntity, asyncIndex);
+            deIndexOldVersionsOfEntity(cpEntity);
+        }
+    }
 
-            // queue an event to update the new entity
-            indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 
);
+    private void 
indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, 
Boolean async) {
+        // queue an event to update the new entity
+        indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , 
async);
+    }
 
-            // queue up an event to clean-up older versions than this one from 
the index
-            if (entityManagerFig.getDeindexOnUpdate()) {
-                indexService.queueDeIndexOldVersion( applicationScope, 
cpEntity.getId(), cpEntity.getVersion());
-            }
+    private void 
deIndexOldVersionsOfEntity(org.apache.usergrid.persistence.model.entity.Entity 
cpEntity) {
+        // queue up an event to clean-up older versions than this one from the 
index
+        if (entityManagerFig.getDeindexOnUpdate()) {
+            indexService.queueDeIndexOldVersion( applicationScope, 
cpEntity.getId(), cpEntity.getVersion());
         }
     }
 
-    private boolean skipIndexingForType( String type ) {
-
-        boolean skipIndexing = false;
-        String collectionName = Schema.defaultCollectionName( type );
 
+    private Boolean asyncIndexingForType( String type ) {
+        return 
CpCollectionUtils.asyncIndexingForType(collectionSettingsFactory, 
applicationId, type);
 
-        CollectionSettings collectionSettings = collectionSettingsFactory
-            .getInstance( new CollectionSettingsScopeImpl(getAppIdObject(), 
collectionName) );
-        Optional<Map<String, Object>> existingSettings =
-            collectionSettings.getCollectionSettings( collectionName );
-
-        if ( existingSettings.isPresent()) {
-            Map jsonMapData = existingSettings.get();
-            Object fields = jsonMapData.get("fields");
-            if ( fields != null && "none".equalsIgnoreCase( fields.toString() 
) ) {
-                skipIndexing = true;
-            }
-        }
+    }
 
-        return skipIndexing;
+    private boolean skipIndexingForType( String type ) {
+        return 
CpCollectionUtils.skipIndexingForType(collectionSettingsFactory, applicationId, 
type);
     }
 
 
@@ -1153,7 +1151,7 @@ public class CpEntityManager implements EntityManager {
         //Adding graphite metrics
 
         if ( !skipIndexingForType( cpEntity.getId().getType() ) ) {
-            indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 
);
+            indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 
, null);
         }
     }
 
@@ -1813,9 +1811,10 @@ public class CpEntityManager implements EntityManager {
             updatedSettings.put( "lastReindexed", 0 );
         }
 
-        // if fields specified, then put in settings
-        if ( newSettings.get("fields") != null ) {
-            updatedSettings.put("fields", newSettings.get("fields"));
+        for (String validName : CpCollectionUtils.getValidSettings()) {
+            if (newSettings.containsKey(validName)) {
+                updatedSettings.put(validName, newSettings.get(validName));
+            }
         }
 
         // if region specified
@@ -2854,14 +2853,8 @@ public class CpEntityManager implements EntityManager {
         entity.setProperties( cpEntity );
 
         // add to and index in collection of the application
-        if ( !is_application ) {
-
-            String collectionName = Schema.defaultCollectionName( eType );
-            CpRelationManager cpr = ( CpRelationManager ) getRelationManager( 
getApplication() );
-            cpr.addToCollection( collectionName, entity );
-
-            // Invoke counters
-            incrementEntityCollection( collectionName, timestamp );
+        if ( !is_application) {
+            updateIndexForEniity(eType, entity, timestamp);
         }
 
         //write to our types map
@@ -2871,6 +2864,14 @@ public class CpEntityManager implements EntityManager {
         return entity;
     }
 
+    private <A extends Entity> void updateIndexForEniity(String eType, A 
entity,  long timestamp) throws Exception {
+        String collectionName = Schema.defaultCollectionName( eType );
+        CpRelationManager cpr = ( CpRelationManager ) getRelationManager( 
getApplication() );
+        cpr.addToCollection( collectionName, entity );
+
+        // Invoke counters
+        incrementEntityCollection( collectionName, timestamp );
+    }
 
     private void incrementEntityCollection( String collection_name, long 
cassandraTimestamp ) {
         try {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index c02ca7d..06f06ad 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -31,6 +31,7 @@ import 
org.apache.usergrid.corepersistence.service.CollectionSearch;
 import org.apache.usergrid.corepersistence.service.CollectionService;
 import org.apache.usergrid.corepersistence.service.ConnectionSearch;
 import org.apache.usergrid.corepersistence.service.ConnectionService;
+import org.apache.usergrid.corepersistence.util.CpCollectionUtils;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.*;
@@ -349,6 +350,7 @@ public class CpRelationManager implements RelationManager {
         Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
         org.apache.usergrid.persistence.model.entity.Entity memberEntity = ( ( 
CpEntityManager ) em ).load( entityId );
 
+        Id memberEntityId = memberEntity.getId();
 
         // don't fetch entity if we've already got one
         final Entity itemEntity;
@@ -364,7 +366,7 @@ public class CpRelationManager implements RelationManager {
         }
 
 
-        if ( memberEntity == null ) {
+        if ( memberEntityId == null ) {
             throw new RuntimeException(
                 "Unable to load entity uuid=" + itemRef.getUuid() + " type=" + 
itemRef.getType() );
         }
@@ -376,7 +378,7 @@ public class CpRelationManager implements RelationManager {
 
 
         // create graph edge connection from head entity to member entity
-        final Edge edge = createCollectionEdge( cpHeadEntity.getId(), 
collectionName, memberEntity.getId() );
+        final Edge edge = createCollectionEdge( cpHeadEntity.getId(), 
collectionName, memberEntityId );
         final String linkedCollection = collection.getLinkedCollection();
 
         GraphManager gm = managerCache.getGraphManager( applicationScope );
@@ -387,21 +389,24 @@ public class CpRelationManager implements RelationManager 
{
             }
         } ).filter( writtenEdge -> linkedCollection != null ).flatMap( 
writtenEdge -> {
             final String pluralType = InflectionUtils.pluralize( 
cpHeadEntity.getId().getType() );
-            final Edge reverseEdge = createCollectionEdge( 
memberEntity.getId(), pluralType, cpHeadEntity.getId() );
+            final Edge reverseEdge = createCollectionEdge( memberEntityId, 
pluralType, cpHeadEntity.getId() );
 
             //reverse
             return gm.writeEdge( reverseEdge ).doOnNext( reverseEdgeWritten -> 
{
 
-                if ( !skipIndexingForType( cpHeadEntity.getId().getType() ) ) {
-
-                    indexService.queueNewEdge(applicationScope, cpHeadEntity, 
reverseEdge);
+                String entityType = cpHeadEntity.getId().getType();
+                if ( !skipIndexingForType( entityType) ) {
+                    Boolean async = asyncIndexingForType(entityType);
+                    indexService.queueNewEdge(applicationScope, 
cpHeadEntity.getId(), reverseEdge, async);
                 }
 
             } );
         } ).doOnCompleted( () -> {
 
-            if ( !skipIndexingForType( memberEntity.getId().getType() ) ) {
-                indexService.queueNewEdge(applicationScope, memberEntity, 
edge);
+            String entityType = memberEntity.getId().getType();
+            if ( !skipIndexingForType( entityType ) ) {
+                Boolean async = asyncIndexingForType(entityType);
+                indexService.queueNewEdge(applicationScope, memberEntityId, 
edge, async);
             }
 
 
@@ -731,9 +736,10 @@ public class CpRelationManager implements RelationManager {
         gm.writeEdge(edge).toBlocking().lastOrDefault(null); //throw an 
exception if this fails
 
 
-        if ( !skipIndexingForType( targetEntity.getId().getType() ) ) {
-
-            indexService.queueNewEdge(applicationScope, targetEntity, edge);
+        String entityType = targetEntity.getId().getType();
+        if ( !skipIndexingForType( entityType ) ) {
+            Boolean async = asyncIndexingForType(entityType);
+            indexService.queueNewEdge(applicationScope, targetEntity.getId(), 
edge, async);
         }
 
         // remove any duplicate edges (keeps the duplicate edge with same 
timestamp)
@@ -1094,27 +1100,13 @@ public class CpRelationManager implements 
RelationManager {
 
     }
 
-    private boolean skipIndexingForType( String type ) {
-
-        boolean skipIndexing = false;
-
-        String collectionName = Schema.defaultCollectionName( type );
+    private Boolean asyncIndexingForType( String type ) {
+        return 
CpCollectionUtils.asyncIndexingForType(collectionSettingsFactory, 
applicationId, type);
 
-        CollectionSettings collectionSettings =
-            collectionSettingsFactory.
-                getInstance( new CollectionSettingsScopeImpl(new SimpleId( 
applicationId, TYPE_APPLICATION ), collectionName ) );
-        Optional<Map<String, Object>> collectionIndexingSchema =
-            collectionSettings.getCollectionSettings( collectionName );
-
-        if ( collectionIndexingSchema.isPresent()) {
-            Map jsonMapData = collectionIndexingSchema.get();
-            final Object fields = jsonMapData.get( "fields" );
-            if ( fields != null && fields instanceof String && 
"none".equalsIgnoreCase( fields.toString())) {
-                skipIndexing = true;
-            }
-        }
+    }
 
-        return skipIndexing;
+    private boolean skipIndexingForType( String type ) {
+        return 
CpCollectionUtils.skipIndexingForType(collectionSettingsFactory, applicationId, 
type);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 04eaf4c..1ddbac4 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -53,7 +53,7 @@ public interface AsyncEventService extends ReIndexAction, 
CollectionDeleteAction
      * @param entity The entity to index.  Should be fired when an entity is 
updated
      * @param updatedAfter
      */
-    void queueEntityIndexUpdate(final ApplicationScope applicationScope, final 
Entity entity, long updatedAfter);
+    void queueEntityIndexUpdate(final ApplicationScope applicationScope, final 
Entity entity, long updatedAfter, Boolean async);
 
 
     /**
@@ -63,10 +63,10 @@ public interface AsyncEventService extends ReIndexAction, 
CollectionDeleteAction
      * TODO: We shouldn't take an entity here, only the id. It doesn't make 
sense in a distributed context
      *
      * @param applicationScope
-     * @param entity
+     * @param entityId
      * @param newEdge
      */
-    void queueNewEdge(final ApplicationScope applicationScope, final Entity 
entity, final Edge newEdge);
+    void queueNewEdge(final ApplicationScope applicationScope, final Id 
entityId, final Edge newEdge, Boolean async);
 
     /**
      * Queue the deletion of an edge

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 3d06cae..3e67110 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -276,15 +276,26 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
      * Offer the EntityIdScope to SQS
      */
     private void offer(final Serializable operation) {
-        offer(operation, AsyncEventQueueType.REGULAR);
+        offer(operation, AsyncEventQueueType.REGULAR, null);
     }
 
-    private void offer(final Serializable operation, AsyncEventQueueType 
queueType) {
+    /**
+     * Offer the EntityIdScope to SQS
+     */
+    private void offer(final Serializable operation, Boolean async) {
+        offer(operation, AsyncEventQueueType.REGULAR, async);
+    }
+
+     /**
+      * Offer the EntityIdScope to SQS
+      */
+    private void offer(final Serializable operation, AsyncEventQueueType 
queueType, Boolean async) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
             //signal to SQS
-            getQueue(queueType).sendMessageToLocalRegion(operation);
+            getQueue(queueType).sendMessageToLocalRegion(operation, async);
+
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -299,7 +310,8 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
         try {
             //signal to SQS
-            getQueue(queueType).sendMessageToAllRegions(operation);
+            getQueue(queueType).sendMessageToAllRegions(operation,null);
+
         }
         catch ( IOException e ) {
             throw new RuntimeException( "Unable to queue message", e );
@@ -530,7 +542,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
     @Override
     public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
-                                       final Entity entity, long updatedAfter) 
{
+                                       final Entity entity, long updatedAfter, 
Boolean async) {
 
 
         if (logger.isTraceEnabled()) {
@@ -538,13 +550,18 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
                 entity.getId().getUuid(), entity.getId().getType());
         }
 
-        offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),
-            new EntityIdScope(applicationScope, entity.getId()), 
updatedAfter));
+
+        EntityIndexEvent event = new 
EntityIndexEvent(queueFig.getPrimaryRegion(),
+            new EntityIdScope(applicationScope, entity.getId()),
+            updatedAfter);
+
+        offer(event, async);
 
     }
 
     private IndexOperationMessage handleEntityIndexUpdate(final 
LegacyQueueMessage message) {
 
+
         Preconditions.checkNotNull( message, "Queue Message cannot be null for 
handleEntityIndexUpdate" );
 
         final AsyncEvent event = ( AsyncEvent ) message.getBody();
@@ -574,15 +591,16 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
     @Override
     public void queueNewEdge(final ApplicationScope applicationScope,
-                             final Entity entity,
-                             final Edge newEdge) {
+                             final Id entityId,
+                             final Edge newEdge,
+                             Boolean async) {
 
         if (logger.isTraceEnabled()) {
             logger.trace("Offering EdgeIndexEvent for edge type {} entity 
{}:{}",
-                newEdge.getType(), entity.getId().getUuid(), 
entity.getId().getType());
+                newEdge.getType(), entityId.getUuid(), entityId.getType());
         }
 
-        offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), 
applicationScope, entity.getId(), newEdge ));
+        offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), 
applicationScope, entityId, newEdge ), async);
 
     }
 
@@ -620,7 +638,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         }
 
         // sent in region (not offerTopic) as the delete IO happens in-region, 
then queues a multi-region de-index op
-        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), 
applicationScope, edge ), AsyncEventQueueType.DELETE );
+        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), 
applicationScope, edge ), AsyncEventQueueType.DELETE , null);
     }
 
     private IndexOperationMessage  handleEdgeDelete(final LegacyQueueMessage 
message) {
@@ -824,7 +842,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
         // sent in region (not offerTopic) as the delete IO happens in-region, 
then queues a multi-region de-index op
         offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new 
EntityIdScope( applicationScope, entityId ) ),
-            AsyncEventQueueType.DELETE );
+            AsyncEventQueueType.DELETE , null);
     }
 
     private IndexOperationMessage handleEntityDelete(final LegacyQueueMessage 
message) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 7c72b72..203d32a 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
 
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
@@ -303,6 +304,7 @@ public class EventBuilderImpl implements EventBuilder {
         return indexService.deIndexOldVersions( applicationScope, entityId,
             getVersionsOlderThanOrEqualToMarked(ecm, entityId, markedVersion));
 
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
new file mode 100644
index 0000000..cef6d12
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.util;
+
+import com.google.common.base.Optional;
+import org.apache.usergrid.corepersistence.index.CollectionSettings;
+import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
+import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
+
+import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import java.util.*;
+
+import static org.apache.usergrid.persistence.Schema.*;
+
+
+/**
+ *
+ * Helper methods to manage the Collection setting properties
+ *
+ */
+public class CpCollectionUtils {
+
+
+    public static final String SETTING_FIELDS = "fields";
+    public static final String SETTING_QUEUE_INDEX = "queueIndex";
+
+    private static Set<String> VALID_SETTING_NAMES = new HashSet<>();
+
+    static {
+        VALID_SETTING_NAMES.add(SETTING_FIELDS);
+        VALID_SETTING_NAMES.add(SETTING_QUEUE_INDEX);
+    }
+
+    public static Set<String> getValidSettings() {
+        return VALID_SETTING_NAMES;
+    }
+
+    public static Boolean asyncIndexingForType(CollectionSettingsFactory 
collectionSettingsFactory, UUID applicationId, String type ) {
+
+        String indexing = getFieldForType(applicationId, 
collectionSettingsFactory, type, SETTING_QUEUE_INDEX);
+        if ("async".equals(indexing)) {
+            return Boolean.TRUE;
+        }
+        if ("sync".equals(indexing)) {
+            return Boolean.FALSE;
+        }
+        return null;
+    }
+
+    public static boolean skipIndexingForType(CollectionSettingsFactory 
collectionSettingsFactory, UUID applicationId, String type ) {
+
+        String fields = getFieldForType(applicationId, 
collectionSettingsFactory, type, SETTING_FIELDS);
+        boolean skipIndexing = false;
+        if ( fields != null && fields instanceof String && 
"none".equalsIgnoreCase( fields.toString())) {
+            skipIndexing = true;
+        }
+
+        return skipIndexing;
+    }
+
+    // these same methods are in CpEntityManager must refactor
+    private static String getFieldForType(UUID applicationId, 
CollectionSettingsFactory collectionSettingsFactory,
+                                          String type, String keyName ) {
+
+        String collectionName = Schema.defaultCollectionName( type );
+
+        CollectionSettings collectionSettings = collectionSettingsFactory
+            .getInstance( new 
CollectionSettingsScopeImpl(getAppIdObject(applicationId), collectionName) );
+        Optional<Map<String, Object>> existingSettings =
+            collectionSettings.getCollectionSettings( collectionName );
+
+        if ( existingSettings.isPresent()) {
+            Map jsonMapData = existingSettings.get();
+            Object value = jsonMapData.get(keyName);
+            if ( value != null) {
+                return value.toString();
+            }
+        }
+        return null;
+    }
+
+    private static Id getAppIdObject(UUID applicationId){
+        return new SimpleId( applicationId, TYPE_APPLICATION );
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
index cecc3b2..766e2b2 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
@@ -144,7 +144,7 @@ public abstract class AsyncIndexServiceTest {
 
 
         //queue up processing
-        asyncEventService.queueEntityIndexUpdate( applicationScope, 
testEntity, 0);
+        asyncEventService.queueEntityIndexUpdate( applicationScope, 
testEntity, 0, null);
 
 
         final EntityIndex EntityIndex =

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
index 0ebcc7b..f19bede 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
@@ -109,4 +109,9 @@ public interface LegacyQueueFig extends GuicyFig {
     @Key("usergrid.queue.map.message.timeout")
     @Default("900000") // 15 minutes
     int getMapMessageTimeout();
+
+    @Key("usergrid.queue.is.async")
+    @Default("true")
+    boolean isAsyncQueue();
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
index f153610..148cb5d 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
@@ -66,6 +66,13 @@ public interface LegacyQueueManager {
      * @param bodies body objects must be serializable
      * @throws IOException
      */
+    void sendMessagesAsync(List bodies) throws IOException;
+
+    /**
+     * send messages to queue
+     * @param bodies body objects must be serializable
+     * @throws IOException
+     */
     void sendMessages(List bodies) throws IOException;
 
     /**
@@ -81,13 +88,13 @@ public interface LegacyQueueManager {
      * @param body
      * @throws IOException
      */
-    <T extends Serializable> void sendMessageToLocalRegion(T body)throws 
IOException;
+    <T extends Serializable> void sendMessageToLocalRegion(T body, Boolean 
async)throws IOException;
 
     /**
      * Send a messae to the topic to be sent to other queues
      * @param body
      */
-    <T extends Serializable> void sendMessageToAllRegions(T body) throws 
IOException;
+    <T extends Serializable> void sendMessageToAllRegions(T body, Boolean 
async) throws IOException;
 
     /**
      * purge messages

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
index cbba0b1..1f07b58 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
@@ -79,6 +79,11 @@ public class LocalQueueManager implements LegacyQueueManager 
{
     }
 
     @Override
+    public void sendMessagesAsync(List bodies) throws IOException {
+        sendMessages(bodies);
+    }
+
+    @Override
     public  void sendMessages(List bodies) throws IOException {
         for(Object body : bodies){
             String uuid = UUID.randomUUID().toString();
@@ -108,7 +113,7 @@ public class LocalQueueManager implements 
LegacyQueueManager {
 
 
     @Override
-    public <T extends Serializable> void sendMessageToLocalRegion(final T body 
) throws IOException {
+    public <T extends Serializable> void sendMessageToLocalRegion(final T 
body, Boolean async) throws IOException {
         String uuid = UUID.randomUUID().toString();
         try {
             queue.offer(new LegacyQueueMessage(uuid, "handle_" + uuid, body, 
"put type here"),5000,TimeUnit.MILLISECONDS);
@@ -120,8 +125,8 @@ public class LocalQueueManager implements 
LegacyQueueManager {
 
 
     @Override
-    public <T extends Serializable> void sendMessageToAllRegions(final T body 
) throws IOException {
-       sendMessageToLocalRegion( body );
+    public <T extends Serializable> void sendMessageToAllRegions(final T body, 
Boolean async) throws IOException {
+       sendMessageToLocalRegion( body, null );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index e7fa47b..59110df 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -101,7 +101,7 @@ public class QakkaQueueManager implements 
LegacyQueueManager {
 
 
     @Override
-    public <T extends Serializable> void sendMessageToLocalRegion(T body) 
throws IOException {
+    public <T extends Serializable> void sendMessageToLocalRegion(T body, 
Boolean async) throws IOException {
         List<String> regionsList = regions.getRegions( Regions.LOCAL );
         logger.trace( "Sending message to queue {} local region {}", 
scope.getName(), regionsList );
         doSendMessage( body, regionsList );
@@ -109,7 +109,7 @@ public class QakkaQueueManager implements 
LegacyQueueManager {
 
 
     @Override
-    public <T extends Serializable> void sendMessageToAllRegions(T body) 
throws IOException {
+    public <T extends Serializable> void sendMessageToAllRegions(T body, 
Boolean async) throws IOException {
         List<String> regionsList = regions.getRegions( Regions.ALL );
         logger.trace( "Sending message to queue {} all regions {}", 
scope.getName(), regionsList );
         doSendMessage( body, regionsList );
@@ -183,12 +183,16 @@ public class QakkaQueueManager implements 
LegacyQueueManager {
         }
     }
 
+    @Override
+    public void sendMessagesAsync( List bodies ) throws IOException {
+        sendMessages(bodies);
+    }
 
     @Override
     public void sendMessages( List bodies ) throws IOException {
 
         for ( Object body : bodies ) {
-            sendMessageToLocalRegion( (Serializable)body );
+            sendMessageToLocalRegion( (Serializable)body, null );
         }
 
     }
@@ -199,7 +203,7 @@ public class QakkaQueueManager implements 
LegacyQueueManager {
 
         List<LegacyQueueMessage> successMessages = new ArrayList<>();
         for ( LegacyQueueMessage queueMessage : queueMessages ) {
-            sendMessageToLocalRegion( (Serializable)queueMessage.getBody() );
+            sendMessageToLocalRegion( (Serializable)queueMessage.getBody() , 
null);
             successMessages.add(queueMessage);
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 5b49bc7..775a64b 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -541,7 +541,49 @@ public class SNSQueueManagerImpl implements 
LegacyQueueManager {
 
 
     @Override
-    public <T extends Serializable> void sendMessageToAllRegions(final T body 
) throws IOException {
+    public <T extends Serializable> void sendMessageToAllRegions(final T body, 
Boolean async) throws IOException {
+        boolean sendAsync = async == null ? fig.isAsyncQueue() : 
async.booleanValue();
+        if (sendAsync) {
+            sendMessageToAllRegionsAsync(body);
+        } else {
+            sendMessageToAllRegionsSync(body);
+        }
+    }
+
+
+    private <T extends Serializable> void sendMessageToAllRegionsSync(final T 
body) throws IOException {
+        if ( sns == null ) {
+            logger.error( "SNS client is null, perhaps it failed to initialize 
successfully" );
+            return;
+        }
+
+        final String stringBody = toString( body );
+
+        String topicArn = getWriteTopicArn();
+
+        if ( logger.isTraceEnabled() ) {
+            logger.trace( "Publishing Message...{} to arn: {}", stringBody, 
topicArn );
+        }
+
+        try {
+            PublishResult publishResult = sns.publish(topicArn, 
toString(body));
+            if ( logger.isTraceEnabled() ) {
+                logger.trace( "Successfully published... messageID=[{}],  
arn=[{}]", publishResult.getMessageId(),
+                    topicArn );
+            }
+        } catch (Exception e) {
+            if (logger.isErrorEnabled()) {
+                logger.error("Failed to send this message {} to SNS queue at 
{}", stringBody, topicArn);
+            }
+            sendMessageToAllRegionsAsync(body);
+        }
+
+
+
+    }
+
+
+    private <T extends Serializable> void sendMessageToAllRegionsAsync(final T 
body ) throws IOException {
         if ( snsAsync == null ) {
             logger.error( "SNS client is null, perhaps it failed to initialize 
successfully" );
             return;
@@ -574,17 +616,27 @@ public class SNSQueueManagerImpl implements 
LegacyQueueManager {
         } );
     }
 
-
     @Override
-    public void sendMessages( final List bodies ) throws IOException {
-
+    public void sendMessagesAsync( final List bodies ) throws IOException {
         if ( sqsAsync == null ) {
             logger.error( "SQS client is null, perhaps it failed to initialize 
successfully" );
             return;
         }
 
         for ( Object body : bodies ) {
-            sendMessageToLocalRegion( ( Serializable ) body );
+            sendMessageToLocalRegionAsync( ( Serializable ) body );
+        }
+    }
+
+
+    @Override
+    public void sendMessages( final List bodies ) throws IOException {
+        for ( Object body : bodies ) {
+            if (fig.isAsyncQueue()) {
+                sendMessageToLocalRegionAsync((Serializable) body);
+            } else {
+                sendMessageToLocalRegionSync((Serializable) body);
+            }
         }
     }
 
@@ -625,16 +677,57 @@ public class SNSQueueManagerImpl implements 
LegacyQueueManager {
         return successMessages;
     }
 
-
     @Override
-    public <T extends Serializable> void sendMessageToLocalRegion(final T body 
) throws IOException {
+    public <T extends Serializable> void sendMessageToLocalRegion(final T 
body, Boolean async) throws IOException {
+        boolean sendAsync = async == null ? fig.isAsyncQueue() : 
async.booleanValue();
+        if (sendAsync) {
+            sendMessageToLocalRegionAsync(body);
+        } else {
+            sendMessageToLocalRegionSync(body);
+        }
+    }
 
-        if ( sqsAsync == null ) {
+    private <T extends Serializable> void sendMessageToLocalRegionSync(final T 
body) throws IOException {
+
+        if ( sqs == null ) {
             logger.error( "SQS client is null, perhaps it failed to initialize 
successfully" );
             return;
         }
         final String stringBody = toString( body );
 
+        if (logger.isDebugEnabled()) {
+            logger.debug(" sendMessageToLocalRegion " + stringBody);
+        }
+
+        String url = getReadQueue().getUrl();
+
+        if ( logger.isTraceEnabled() ) {
+            logger.trace( "Publishing Message...{} to url: {}", stringBody, 
url );
+        }
+
+        SendMessageRequest messageRequest = new SendMessageRequest(url, 
stringBody);
+        try {
+            SendMessageResult result = sqs.sendMessage(messageRequest);
+            if (logger.isTraceEnabled()) {
+                logger.trace("Successfully published... messageID=[{}],  
arn=[{}]", result.getMessageId(),
+                    url);
+            }
+        } catch (Exception e) {
+            logger.error("Failed to send this message {}. To this address {}. 
Error was ",  messageRequest.getMessageBody(), url, e);
+            sendMessageToLocalRegionAsync(body);
+        }
+
+
+    }
+
+
+    private <T extends Serializable> void sendMessageToLocalRegionAsync(final 
T body ) throws IOException {
+
+        if ( sqsAsync == null ) {
+            logger.error( "SQS client is null, perhaps it failed to initialize 
successfully" );
+            return;
+        }
+        final String stringBody = toString( body );
         String url = getReadQueue().getUrl();
 
         if ( logger.isTraceEnabled() ) {
@@ -647,8 +740,7 @@ public class SNSQueueManagerImpl implements 
LegacyQueueManager {
 
             @Override
             public void onError( final Exception e ) {
-
-                logger.error( "Error sending message... {}", e );
+                logger.error("Failed to send this message {}. To this address 
{}. Error was ", stringBody, url, e);
             }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
index ad73dd7..c57ff8b 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
@@ -62,7 +62,7 @@ public class LegacyQueueManagerTest extends AbstractAkkaTest {
         LegacyQueueManager qm = qmf.getQueueManager(scope);
 
         String value = "bodytest";
-        qm.sendMessageToLocalRegion(value);
+        qm.sendMessageToLocalRegion(value, null);
 
         Thread.sleep(5000);
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index d26cd5f..b7afc2e 100644
--- 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -356,7 +356,7 @@ public class ApplicationQueueManagerImpl implements 
ApplicationQueueManager {
                             if(logger.isTraceEnabled()) {
                                 logger.trace("Queueing notification message 
for device: {}", message.get().getDeviceId());
                             }
-                            qm.sendMessageToLocalRegion( message.get() );
+                            qm.sendMessageToLocalRegion( message.get() , null 
);
                             queueMeter.mark();
                         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
 
b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index f7e107d..ae58d7d 100644
--- 
a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ 
b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -59,6 +59,10 @@ public class ImportQueueManager implements 
LegacyQueueManager {
 
     }
 
+    @Override
+    public void sendMessagesAsync( final List bodies ) throws IOException {
+
+    }
 
     @Override
     public void sendMessages( final List bodies ) throws IOException {
@@ -73,13 +77,13 @@ public class ImportQueueManager implements 
LegacyQueueManager {
 
 
     @Override
-    public <T extends Serializable> void sendMessageToLocalRegion(final T body 
) throws IOException {
+    public <T extends Serializable> void sendMessageToLocalRegion(final T 
body, Boolean async) throws IOException {
 
     }
 
 
     @Override
-    public <T extends Serializable> void sendMessageToAllRegions(final T body 
) throws IOException {
+    public <T extends Serializable> void sendMessageToAllRegions(final T body, 
Boolean async) throws IOException {
 
     }
 

Reply via email to