Allow submission to SNS/SQS via sync client
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9a151089 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9a151089 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9a151089 Branch: refs/heads/expose-reindex Commit: 9a15108924b29997c9ca440b467ab8006a8f8acb Parents: 173be8a Author: Peter Johnson <pjohn...@apigee.com> Authored: Wed Sep 27 08:19:20 2017 -0700 Committer: Peter Johnson <pjohn...@apigee.com> Committed: Wed Sep 27 08:19:20 2017 -0700 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 75 +++++++------ .../corepersistence/CpRelationManager.java | 52 ++++----- .../asyncevents/AsyncEventService.java | 6 +- .../asyncevents/AsyncEventServiceImpl.java | 44 +++++--- .../asyncevents/EventBuilderImpl.java | 2 + .../corepersistence/util/CpCollectionUtils.java | 104 +++++++++++++++++ .../index/AsyncIndexServiceTest.java | 2 +- .../persistence/queue/LegacyQueueFig.java | 5 + .../persistence/queue/LegacyQueueManager.java | 11 +- .../persistence/queue/LocalQueueManager.java | 11 +- .../queue/impl/QakkaQueueManager.java | 12 +- .../queue/impl/SNSQueueManagerImpl.java | 112 +++++++++++++++++-- .../queue/LegacyQueueManagerTest.java | 2 +- .../impl/ApplicationQueueManagerImpl.java | 2 +- .../services/queues/ImportQueueManager.java | 8 +- 15 files changed, 341 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 68c4ef0..1dc4a89 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -34,6 +34,7 @@ import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory; import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl; import org.apache.usergrid.corepersistence.service.CollectionService; import org.apache.usergrid.corepersistence.service.ConnectionService; +import org.apache.usergrid.corepersistence.util.CpCollectionUtils; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.mq.QueueManager; @@ -161,7 +162,6 @@ public class CpEntityManager implements EntityManager { public QueueManagerFactory queueManagerFactory; - // /** Short-term cache to keep us from reloading same Entity during single request. */ // private LoadingCache<EntityScope, org.apache.usergrid.persistence.model.entity.Entity> entityCache; @@ -521,6 +521,10 @@ public class CpEntityManager implements EntityManager { cpEntity = CpEntityMapUtils.fromMap( cpEntity, entity.getProperties(), entity.getType(), true ); + String entityType = cpEntity.getId().getType(); + boolean skipIndexingForType = skipIndexingForType(entityType); + Boolean asyncIndex = asyncIndexingForType(entityType); + try { String region = lookupAuthoritativeRegionForType( entity.getType() ); @@ -546,38 +550,32 @@ public class CpEntityManager implements EntityManager { handleWriteUniqueVerifyException( entity, wuve ); } - if ( !skipIndexingForType( cpEntity.getId().getType() ) ) { + if (!skipIndexingForType) { + indexEntity(cpEntity, asyncIndex); + deIndexOldVersionsOfEntity(cpEntity); + } + } - // queue an event to update the new entity - indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 ); + private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, Boolean async) { + // queue an event to update the new entity + indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , async); + } - // queue up an event to clean-up older versions than this one from the index - if (entityManagerFig.getDeindexOnUpdate()) { - indexService.queueDeIndexOldVersion( applicationScope, cpEntity.getId(), cpEntity.getVersion()); - } + private void deIndexOldVersionsOfEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity) { + // queue up an event to clean-up older versions than this one from the index + if (entityManagerFig.getDeindexOnUpdate()) { + indexService.queueDeIndexOldVersion( applicationScope, cpEntity.getId(), cpEntity.getVersion()); } } - private boolean skipIndexingForType( String type ) { - - boolean skipIndexing = false; - String collectionName = Schema.defaultCollectionName( type ); + private Boolean asyncIndexingForType( String type ) { + return CpCollectionUtils.asyncIndexingForType(collectionSettingsFactory, applicationId, type); - CollectionSettings collectionSettings = collectionSettingsFactory - .getInstance( new CollectionSettingsScopeImpl(getAppIdObject(), collectionName) ); - Optional<Map<String, Object>> existingSettings = - collectionSettings.getCollectionSettings( collectionName ); - - if ( existingSettings.isPresent()) { - Map jsonMapData = existingSettings.get(); - Object fields = jsonMapData.get("fields"); - if ( fields != null && "none".equalsIgnoreCase( fields.toString() ) ) { - skipIndexing = true; - } - } + } - return skipIndexing; + private boolean skipIndexingForType( String type ) { + return CpCollectionUtils.skipIndexingForType(collectionSettingsFactory, applicationId, type); } @@ -1153,7 +1151,7 @@ public class CpEntityManager implements EntityManager { //Adding graphite metrics if ( !skipIndexingForType( cpEntity.getId().getType() ) ) { - indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 ); + indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , null); } } @@ -1813,9 +1811,10 @@ public class CpEntityManager implements EntityManager { updatedSettings.put( "lastReindexed", 0 ); } - // if fields specified, then put in settings - if ( newSettings.get("fields") != null ) { - updatedSettings.put("fields", newSettings.get("fields")); + for (String validName : CpCollectionUtils.getValidSettings()) { + if (newSettings.containsKey(validName)) { + updatedSettings.put(validName, newSettings.get(validName)); + } } // if region specified @@ -2854,14 +2853,8 @@ public class CpEntityManager implements EntityManager { entity.setProperties( cpEntity ); // add to and index in collection of the application - if ( !is_application ) { - - String collectionName = Schema.defaultCollectionName( eType ); - CpRelationManager cpr = ( CpRelationManager ) getRelationManager( getApplication() ); - cpr.addToCollection( collectionName, entity ); - - // Invoke counters - incrementEntityCollection( collectionName, timestamp ); + if ( !is_application) { + updateIndexForEniity(eType, entity, timestamp); } //write to our types map @@ -2871,6 +2864,14 @@ public class CpEntityManager implements EntityManager { return entity; } + private <A extends Entity> void updateIndexForEniity(String eType, A entity, long timestamp) throws Exception { + String collectionName = Schema.defaultCollectionName( eType ); + CpRelationManager cpr = ( CpRelationManager ) getRelationManager( getApplication() ); + cpr.addToCollection( collectionName, entity ); + + // Invoke counters + incrementEntityCollection( collectionName, timestamp ); + } private void incrementEntityCollection( String collection_name, long cassandraTimestamp ) { try { http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index c02ca7d..06f06ad 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -31,6 +31,7 @@ import org.apache.usergrid.corepersistence.service.CollectionSearch; import org.apache.usergrid.corepersistence.service.CollectionService; import org.apache.usergrid.corepersistence.service.ConnectionSearch; import org.apache.usergrid.corepersistence.service.ConnectionService; +import org.apache.usergrid.corepersistence.util.CpCollectionUtils; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.*; @@ -349,6 +350,7 @@ public class CpRelationManager implements RelationManager { Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() ); org.apache.usergrid.persistence.model.entity.Entity memberEntity = ( ( CpEntityManager ) em ).load( entityId ); + Id memberEntityId = memberEntity.getId(); // don't fetch entity if we've already got one final Entity itemEntity; @@ -364,7 +366,7 @@ public class CpRelationManager implements RelationManager { } - if ( memberEntity == null ) { + if ( memberEntityId == null ) { throw new RuntimeException( "Unable to load entity uuid=" + itemRef.getUuid() + " type=" + itemRef.getType() ); } @@ -376,7 +378,7 @@ public class CpRelationManager implements RelationManager { // create graph edge connection from head entity to member entity - final Edge edge = createCollectionEdge( cpHeadEntity.getId(), collectionName, memberEntity.getId() ); + final Edge edge = createCollectionEdge( cpHeadEntity.getId(), collectionName, memberEntityId ); final String linkedCollection = collection.getLinkedCollection(); GraphManager gm = managerCache.getGraphManager( applicationScope ); @@ -387,21 +389,24 @@ public class CpRelationManager implements RelationManager { } } ).filter( writtenEdge -> linkedCollection != null ).flatMap( writtenEdge -> { final String pluralType = InflectionUtils.pluralize( cpHeadEntity.getId().getType() ); - final Edge reverseEdge = createCollectionEdge( memberEntity.getId(), pluralType, cpHeadEntity.getId() ); + final Edge reverseEdge = createCollectionEdge( memberEntityId, pluralType, cpHeadEntity.getId() ); //reverse return gm.writeEdge( reverseEdge ).doOnNext( reverseEdgeWritten -> { - if ( !skipIndexingForType( cpHeadEntity.getId().getType() ) ) { - - indexService.queueNewEdge(applicationScope, cpHeadEntity, reverseEdge); + String entityType = cpHeadEntity.getId().getType(); + if ( !skipIndexingForType( entityType) ) { + Boolean async = asyncIndexingForType(entityType); + indexService.queueNewEdge(applicationScope, cpHeadEntity.getId(), reverseEdge, async); } } ); } ).doOnCompleted( () -> { - if ( !skipIndexingForType( memberEntity.getId().getType() ) ) { - indexService.queueNewEdge(applicationScope, memberEntity, edge); + String entityType = memberEntity.getId().getType(); + if ( !skipIndexingForType( entityType ) ) { + Boolean async = asyncIndexingForType(entityType); + indexService.queueNewEdge(applicationScope, memberEntityId, edge, async); } @@ -731,9 +736,10 @@ public class CpRelationManager implements RelationManager { gm.writeEdge(edge).toBlocking().lastOrDefault(null); //throw an exception if this fails - if ( !skipIndexingForType( targetEntity.getId().getType() ) ) { - - indexService.queueNewEdge(applicationScope, targetEntity, edge); + String entityType = targetEntity.getId().getType(); + if ( !skipIndexingForType( entityType ) ) { + Boolean async = asyncIndexingForType(entityType); + indexService.queueNewEdge(applicationScope, targetEntity.getId(), edge, async); } // remove any duplicate edges (keeps the duplicate edge with same timestamp) @@ -1094,27 +1100,13 @@ public class CpRelationManager implements RelationManager { } - private boolean skipIndexingForType( String type ) { - - boolean skipIndexing = false; - - String collectionName = Schema.defaultCollectionName( type ); + private Boolean asyncIndexingForType( String type ) { + return CpCollectionUtils.asyncIndexingForType(collectionSettingsFactory, applicationId, type); - CollectionSettings collectionSettings = - collectionSettingsFactory. - getInstance( new CollectionSettingsScopeImpl(new SimpleId( applicationId, TYPE_APPLICATION ), collectionName ) ); - Optional<Map<String, Object>> collectionIndexingSchema = - collectionSettings.getCollectionSettings( collectionName ); - - if ( collectionIndexingSchema.isPresent()) { - Map jsonMapData = collectionIndexingSchema.get(); - final Object fields = jsonMapData.get( "fields" ); - if ( fields != null && fields instanceof String && "none".equalsIgnoreCase( fields.toString())) { - skipIndexing = true; - } - } + } - return skipIndexing; + private boolean skipIndexingForType( String type ) { + return CpCollectionUtils.skipIndexingForType(collectionSettingsFactory, applicationId, type); } /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index 04eaf4c..1ddbac4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -53,7 +53,7 @@ public interface AsyncEventService extends ReIndexAction, CollectionDeleteAction * @param entity The entity to index. Should be fired when an entity is updated * @param updatedAfter */ - void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter); + void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter, Boolean async); /** @@ -63,10 +63,10 @@ public interface AsyncEventService extends ReIndexAction, CollectionDeleteAction * TODO: We shouldn't take an entity here, only the id. It doesn't make sense in a distributed context * * @param applicationScope - * @param entity + * @param entityId * @param newEdge */ - void queueNewEdge(final ApplicationScope applicationScope, final Entity entity, final Edge newEdge); + void queueNewEdge(final ApplicationScope applicationScope, final Id entityId, final Edge newEdge, Boolean async); /** * Queue the deletion of an edge http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 3d06cae..3e67110 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -276,15 +276,26 @@ public class AsyncEventServiceImpl implements AsyncEventService { * Offer the EntityIdScope to SQS */ private void offer(final Serializable operation) { - offer(operation, AsyncEventQueueType.REGULAR); + offer(operation, AsyncEventQueueType.REGULAR, null); } - private void offer(final Serializable operation, AsyncEventQueueType queueType) { + /** + * Offer the EntityIdScope to SQS + */ + private void offer(final Serializable operation, Boolean async) { + offer(operation, AsyncEventQueueType.REGULAR, async); + } + + /** + * Offer the EntityIdScope to SQS + */ + private void offer(final Serializable operation, AsyncEventQueueType queueType, Boolean async) { final Timer.Context timer = this.writeTimer.time(); try { //signal to SQS - getQueue(queueType).sendMessageToLocalRegion(operation); + getQueue(queueType).sendMessageToLocalRegion(operation, async); + } catch (IOException e) { throw new RuntimeException("Unable to queue message", e); } finally { @@ -299,7 +310,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { try { //signal to SQS - getQueue(queueType).sendMessageToAllRegions(operation); + getQueue(queueType).sendMessageToAllRegions(operation,null); + } catch ( IOException e ) { throw new RuntimeException( "Unable to queue message", e ); @@ -530,7 +542,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { @Override public void queueEntityIndexUpdate(final ApplicationScope applicationScope, - final Entity entity, long updatedAfter) { + final Entity entity, long updatedAfter, Boolean async) { if (logger.isTraceEnabled()) { @@ -538,13 +550,18 @@ public class AsyncEventServiceImpl implements AsyncEventService { entity.getId().getUuid(), entity.getId().getType()); } - offer(new EntityIndexEvent(queueFig.getPrimaryRegion(), - new EntityIdScope(applicationScope, entity.getId()), updatedAfter)); + + EntityIndexEvent event = new EntityIndexEvent(queueFig.getPrimaryRegion(), + new EntityIdScope(applicationScope, entity.getId()), + updatedAfter); + + offer(event, async); } private IndexOperationMessage handleEntityIndexUpdate(final LegacyQueueMessage message) { + Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" ); final AsyncEvent event = ( AsyncEvent ) message.getBody(); @@ -574,15 +591,16 @@ public class AsyncEventServiceImpl implements AsyncEventService { @Override public void queueNewEdge(final ApplicationScope applicationScope, - final Entity entity, - final Edge newEdge) { + final Id entityId, + final Edge newEdge, + Boolean async) { if (logger.isTraceEnabled()) { logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}", - newEdge.getType(), entity.getId().getUuid(), entity.getId().getType()); + newEdge.getType(), entityId.getUuid(), entityId.getType()); } - offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge )); + offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entityId, newEdge ), async); } @@ -620,7 +638,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { } // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op - offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ), AsyncEventQueueType.DELETE ); + offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ), AsyncEventQueueType.DELETE , null); } private IndexOperationMessage handleEdgeDelete(final LegacyQueueMessage message) { @@ -824,7 +842,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ), - AsyncEventQueueType.DELETE ); + AsyncEventQueueType.DELETE , null); } private IndexOperationMessage handleEntityDelete(final LegacyQueueMessage message) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index 7c72b72..203d32a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -21,6 +21,7 @@ package org.apache.usergrid.corepersistence.asyncevents; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -303,6 +304,7 @@ public class EventBuilderImpl implements EventBuilder { return indexService.deIndexOldVersions( applicationScope, entityId, getVersionsOlderThanOrEqualToMarked(ecm, entityId, markedVersion)); + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java new file mode 100644 index 0000000..cef6d12 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java @@ -0,0 +1,104 @@ +/* + * Copyright 2014 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.util; + +import com.google.common.base.Optional; +import org.apache.usergrid.corepersistence.index.CollectionSettings; +import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory; +import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl; + +import org.apache.usergrid.persistence.*; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.entity.SimpleId; + +import java.util.*; + +import static org.apache.usergrid.persistence.Schema.*; + + +/** + * + * Helper methods to manage the Collection setting properties + * + */ +public class CpCollectionUtils { + + + public static final String SETTING_FIELDS = "fields"; + public static final String SETTING_QUEUE_INDEX = "queueIndex"; + + private static Set<String> VALID_SETTING_NAMES = new HashSet<>(); + + static { + VALID_SETTING_NAMES.add(SETTING_FIELDS); + VALID_SETTING_NAMES.add(SETTING_QUEUE_INDEX); + } + + public static Set<String> getValidSettings() { + return VALID_SETTING_NAMES; + } + + public static Boolean asyncIndexingForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) { + + String indexing = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_QUEUE_INDEX); + if ("async".equals(indexing)) { + return Boolean.TRUE; + } + if ("sync".equals(indexing)) { + return Boolean.FALSE; + } + return null; + } + + public static boolean skipIndexingForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) { + + String fields = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_FIELDS); + boolean skipIndexing = false; + if ( fields != null && fields instanceof String && "none".equalsIgnoreCase( fields.toString())) { + skipIndexing = true; + } + + return skipIndexing; + } + + // these same methods are in CpEntityManager must refactor + private static String getFieldForType(UUID applicationId, CollectionSettingsFactory collectionSettingsFactory, + String type, String keyName ) { + + String collectionName = Schema.defaultCollectionName( type ); + + CollectionSettings collectionSettings = collectionSettingsFactory + .getInstance( new CollectionSettingsScopeImpl(getAppIdObject(applicationId), collectionName) ); + Optional<Map<String, Object>> existingSettings = + collectionSettings.getCollectionSettings( collectionName ); + + if ( existingSettings.isPresent()) { + Map jsonMapData = existingSettings.get(); + Object value = jsonMapData.get(keyName); + if ( value != null) { + return value.toString(); + } + } + return null; + } + + private static Id getAppIdObject(UUID applicationId){ + return new SimpleId( applicationId, TYPE_APPLICATION ); + } + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java index cecc3b2..766e2b2 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java @@ -144,7 +144,7 @@ public abstract class AsyncIndexServiceTest { //queue up processing - asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity, 0); + asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity, 0, null); final EntityIndex EntityIndex = http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java index 0ebcc7b..f19bede 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java @@ -109,4 +109,9 @@ public interface LegacyQueueFig extends GuicyFig { @Key("usergrid.queue.map.message.timeout") @Default("900000") // 15 minutes int getMapMessageTimeout(); + + @Key("usergrid.queue.is.async") + @Default("true") + boolean isAsyncQueue(); + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java index f153610..148cb5d 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java @@ -66,6 +66,13 @@ public interface LegacyQueueManager { * @param bodies body objects must be serializable * @throws IOException */ + void sendMessagesAsync(List bodies) throws IOException; + + /** + * send messages to queue + * @param bodies body objects must be serializable + * @throws IOException + */ void sendMessages(List bodies) throws IOException; /** @@ -81,13 +88,13 @@ public interface LegacyQueueManager { * @param body * @throws IOException */ - <T extends Serializable> void sendMessageToLocalRegion(T body)throws IOException; + <T extends Serializable> void sendMessageToLocalRegion(T body, Boolean async)throws IOException; /** * Send a messae to the topic to be sent to other queues * @param body */ - <T extends Serializable> void sendMessageToAllRegions(T body) throws IOException; + <T extends Serializable> void sendMessageToAllRegions(T body, Boolean async) throws IOException; /** * purge messages http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java index cbba0b1..1f07b58 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java @@ -79,6 +79,11 @@ public class LocalQueueManager implements LegacyQueueManager { } @Override + public void sendMessagesAsync(List bodies) throws IOException { + sendMessages(bodies); + } + + @Override public void sendMessages(List bodies) throws IOException { for(Object body : bodies){ String uuid = UUID.randomUUID().toString(); @@ -108,7 +113,7 @@ public class LocalQueueManager implements LegacyQueueManager { @Override - public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException { + public <T extends Serializable> void sendMessageToLocalRegion(final T body, Boolean async) throws IOException { String uuid = UUID.randomUUID().toString(); try { queue.offer(new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here"),5000,TimeUnit.MILLISECONDS); @@ -120,8 +125,8 @@ public class LocalQueueManager implements LegacyQueueManager { @Override - public <T extends Serializable> void sendMessageToAllRegions(final T body ) throws IOException { - sendMessageToLocalRegion( body ); + public <T extends Serializable> void sendMessageToAllRegions(final T body, Boolean async) throws IOException { + sendMessageToLocalRegion( body, null ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java index e7fa47b..59110df 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java @@ -101,7 +101,7 @@ public class QakkaQueueManager implements LegacyQueueManager { @Override - public <T extends Serializable> void sendMessageToLocalRegion(T body) throws IOException { + public <T extends Serializable> void sendMessageToLocalRegion(T body, Boolean async) throws IOException { List<String> regionsList = regions.getRegions( Regions.LOCAL ); logger.trace( "Sending message to queue {} local region {}", scope.getName(), regionsList ); doSendMessage( body, regionsList ); @@ -109,7 +109,7 @@ public class QakkaQueueManager implements LegacyQueueManager { @Override - public <T extends Serializable> void sendMessageToAllRegions(T body) throws IOException { + public <T extends Serializable> void sendMessageToAllRegions(T body, Boolean async) throws IOException { List<String> regionsList = regions.getRegions( Regions.ALL ); logger.trace( "Sending message to queue {} all regions {}", scope.getName(), regionsList ); doSendMessage( body, regionsList ); @@ -183,12 +183,16 @@ public class QakkaQueueManager implements LegacyQueueManager { } } + @Override + public void sendMessagesAsync( List bodies ) throws IOException { + sendMessages(bodies); + } @Override public void sendMessages( List bodies ) throws IOException { for ( Object body : bodies ) { - sendMessageToLocalRegion( (Serializable)body ); + sendMessageToLocalRegion( (Serializable)body, null ); } } @@ -199,7 +203,7 @@ public class QakkaQueueManager implements LegacyQueueManager { List<LegacyQueueMessage> successMessages = new ArrayList<>(); for ( LegacyQueueMessage queueMessage : queueMessages ) { - sendMessageToLocalRegion( (Serializable)queueMessage.getBody() ); + sendMessageToLocalRegion( (Serializable)queueMessage.getBody() , null); successMessages.add(queueMessage); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index 5b49bc7..775a64b 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -541,7 +541,49 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { @Override - public <T extends Serializable> void sendMessageToAllRegions(final T body ) throws IOException { + public <T extends Serializable> void sendMessageToAllRegions(final T body, Boolean async) throws IOException { + boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue(); + if (sendAsync) { + sendMessageToAllRegionsAsync(body); + } else { + sendMessageToAllRegionsSync(body); + } + } + + + private <T extends Serializable> void sendMessageToAllRegionsSync(final T body) throws IOException { + if ( sns == null ) { + logger.error( "SNS client is null, perhaps it failed to initialize successfully" ); + return; + } + + final String stringBody = toString( body ); + + String topicArn = getWriteTopicArn(); + + if ( logger.isTraceEnabled() ) { + logger.trace( "Publishing Message...{} to arn: {}", stringBody, topicArn ); + } + + try { + PublishResult publishResult = sns.publish(topicArn, toString(body)); + if ( logger.isTraceEnabled() ) { + logger.trace( "Successfully published... messageID=[{}], arn=[{}]", publishResult.getMessageId(), + topicArn ); + } + } catch (Exception e) { + if (logger.isErrorEnabled()) { + logger.error("Failed to send this message {} to SNS queue at {}", stringBody, topicArn); + } + sendMessageToAllRegionsAsync(body); + } + + + + } + + + private <T extends Serializable> void sendMessageToAllRegionsAsync(final T body ) throws IOException { if ( snsAsync == null ) { logger.error( "SNS client is null, perhaps it failed to initialize successfully" ); return; @@ -574,17 +616,27 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { } ); } - @Override - public void sendMessages( final List bodies ) throws IOException { - + public void sendMessagesAsync( final List bodies ) throws IOException { if ( sqsAsync == null ) { logger.error( "SQS client is null, perhaps it failed to initialize successfully" ); return; } for ( Object body : bodies ) { - sendMessageToLocalRegion( ( Serializable ) body ); + sendMessageToLocalRegionAsync( ( Serializable ) body ); + } + } + + + @Override + public void sendMessages( final List bodies ) throws IOException { + for ( Object body : bodies ) { + if (fig.isAsyncQueue()) { + sendMessageToLocalRegionAsync((Serializable) body); + } else { + sendMessageToLocalRegionSync((Serializable) body); + } } } @@ -625,16 +677,57 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { return successMessages; } - @Override - public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException { + public <T extends Serializable> void sendMessageToLocalRegion(final T body, Boolean async) throws IOException { + boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue(); + if (sendAsync) { + sendMessageToLocalRegionAsync(body); + } else { + sendMessageToLocalRegionSync(body); + } + } - if ( sqsAsync == null ) { + private <T extends Serializable> void sendMessageToLocalRegionSync(final T body) throws IOException { + + if ( sqs == null ) { logger.error( "SQS client is null, perhaps it failed to initialize successfully" ); return; } final String stringBody = toString( body ); + if (logger.isDebugEnabled()) { + logger.debug(" sendMessageToLocalRegion " + stringBody); + } + + String url = getReadQueue().getUrl(); + + if ( logger.isTraceEnabled() ) { + logger.trace( "Publishing Message...{} to url: {}", stringBody, url ); + } + + SendMessageRequest messageRequest = new SendMessageRequest(url, stringBody); + try { + SendMessageResult result = sqs.sendMessage(messageRequest); + if (logger.isTraceEnabled()) { + logger.trace("Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(), + url); + } + } catch (Exception e) { + logger.error("Failed to send this message {}. To this address {}. Error was ", messageRequest.getMessageBody(), url, e); + sendMessageToLocalRegionAsync(body); + } + + + } + + + private <T extends Serializable> void sendMessageToLocalRegionAsync(final T body ) throws IOException { + + if ( sqsAsync == null ) { + logger.error( "SQS client is null, perhaps it failed to initialize successfully" ); + return; + } + final String stringBody = toString( body ); String url = getReadQueue().getUrl(); if ( logger.isTraceEnabled() ) { @@ -647,8 +740,7 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { @Override public void onError( final Exception e ) { - - logger.error( "Error sending message... {}", e ); + logger.error("Failed to send this message {}. To this address {}. Error was ", stringBody, url, e); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java index ad73dd7..c57ff8b 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java @@ -62,7 +62,7 @@ public class LegacyQueueManagerTest extends AbstractAkkaTest { LegacyQueueManager qm = qmf.getQueueManager(scope); String value = "bodytest"; - qm.sendMessageToLocalRegion(value); + qm.sendMessageToLocalRegion(value, null); Thread.sleep(5000); http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index d26cd5f..b7afc2e 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -356,7 +356,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { if(logger.isTraceEnabled()) { logger.trace("Queueing notification message for device: {}", message.get().getDeviceId()); } - qm.sendMessageToLocalRegion( message.get() ); + qm.sendMessageToLocalRegion( message.get() , null ); queueMeter.mark(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a151089/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java index f7e107d..ae58d7d 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java @@ -59,6 +59,10 @@ public class ImportQueueManager implements LegacyQueueManager { } + @Override + public void sendMessagesAsync( final List bodies ) throws IOException { + + } @Override public void sendMessages( final List bodies ) throws IOException { @@ -73,13 +77,13 @@ public class ImportQueueManager implements LegacyQueueManager { @Override - public <T extends Serializable> void sendMessageToLocalRegion(final T body ) throws IOException { + public <T extends Serializable> void sendMessageToLocalRegion(final T body, Boolean async) throws IOException { } @Override - public <T extends Serializable> void sendMessageToAllRegions(final T body ) throws IOException { + public <T extends Serializable> void sendMessageToAllRegions(final T body, Boolean async) throws IOException { }