Add sourceRegion to SQS event message based on queue primary region. Remove cached entity manager (unused).
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a09485a3 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a09485a3 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a09485a3 Branch: refs/heads/usergrid-1007-shiro-cache Commit: a09485a3a5ac8b4217b294f4754ea8a70a7ec447 Parents: 2b22c61 Author: Michael Russo <michaelaru...@gmail.com> Authored: Fri Oct 16 13:12:42 2015 -0700 Committer: Michael Russo <michaelaru...@gmail.com> Committed: Fri Oct 16 13:12:42 2015 -0700 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 40 ++--- .../asyncevents/AsyncIndexProvider.java | 10 +- .../asyncevents/model/AsyncEvent.java | 14 +- .../asyncevents/model/EdgeDeleteEvent.java | 6 +- .../asyncevents/model/EdgeIndexEvent.java | 9 +- .../asyncevents/model/EntityDeleteEvent.java | 8 +- .../asyncevents/model/EntityIndexEvent.java | 6 +- .../model/InitializeApplicationIndexEvent.java | 4 +- .../index/AmazonAsyncEventServiceTest.java | 6 +- .../cache/CachedEntityCollectionManager.java | 147 ------------------- .../EntityCollectionManagerFactoryImpl.java | 6 - .../usergrid/persistence/queue/QueueFig.java | 2 +- .../queue/impl/SNSQueueManagerImpl.java | 6 +- .../queue/impl/SQSQueueManagerImpl.java | 2 +- 14 files changed, 64 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/a09485a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index 95126c6..c198674 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -29,6 +29,7 @@ import java.util.stream.Stream; import com.google.common.base.Optional; import org.apache.usergrid.persistence.index.impl.IndexProducer; +import org.apache.usergrid.persistence.queue.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,10 +57,6 @@ import org.apache.usergrid.persistence.index.IndexLocationStrategy; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; -import org.apache.usergrid.persistence.queue.QueueMessage; -import org.apache.usergrid.persistence.queue.QueueScope; import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; import com.codahale.metrics.Counter; @@ -89,6 +86,7 @@ public class AmazonAsyncEventService implements AsyncEventService { private final QueueManager queue; private final QueueScope queueScope; private final IndexProcessorFig indexProcessorFig; + private final QueueFig queueFig; private final IndexProducer indexProducer; private final EntityCollectionManagerFactory entityCollectionManagerFactory; private final IndexLocationStrategyFactory indexLocationStrategyFactory; @@ -115,15 +113,16 @@ public class AmazonAsyncEventService implements AsyncEventService { @Inject - public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory, - final IndexProcessorFig indexProcessorFig, - final IndexProducer indexProducer, - final MetricsFactory metricsFactory, - final EntityCollectionManagerFactory entityCollectionManagerFactory, - final IndexLocationStrategyFactory indexLocationStrategyFactory, - final EntityIndexFactory entityIndexFactory, - final EventBuilder eventBuilder, - final RxTaskScheduler rxTaskScheduler ) { + public AmazonAsyncEventService(final QueueManagerFactory queueManagerFactory, + final IndexProcessorFig indexProcessorFig, + final IndexProducer indexProducer, + final MetricsFactory metricsFactory, + final EntityCollectionManagerFactory entityCollectionManagerFactory, + final IndexLocationStrategyFactory indexLocationStrategyFactory, + final EntityIndexFactory entityIndexFactory, + final EventBuilder eventBuilder, + final RxTaskScheduler rxTaskScheduler, + QueueFig queueFig) { this.indexProducer = indexProducer; this.entityCollectionManagerFactory = entityCollectionManagerFactory; @@ -135,6 +134,7 @@ public class AmazonAsyncEventService implements AsyncEventService { this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL); this.queue = queueManagerFactory.getQueueManager(queueScope); this.indexProcessorFig = indexProcessorFig; + this.queueFig = queueFig; this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write"); this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read"); @@ -328,7 +328,7 @@ public class AmazonAsyncEventService implements AsyncEventService { public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) { IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy( applicationScope ); - offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy))); + offer(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy))); } @@ -336,7 +336,7 @@ public class AmazonAsyncEventService implements AsyncEventService { public void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity) { - offer(new EntityIndexEvent(new EntityIdScope(applicationScope, entity.getId()), 0)); + offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),new EntityIdScope(applicationScope, entity.getId()), 0)); } @@ -371,7 +371,7 @@ public class AmazonAsyncEventService implements AsyncEventService { final Entity entity, final Edge newEdge) { - EdgeIndexEvent operation = new EdgeIndexEvent(applicationScope, entity.getId(), newEdge); + EdgeIndexEvent operation = new EdgeIndexEvent(queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge); offer( operation ); } @@ -403,7 +403,7 @@ public class AmazonAsyncEventService implements AsyncEventService { public void queueDeleteEdge(final ApplicationScope applicationScope, final Edge edge) { - offer( new EdgeDeleteEvent( applicationScope, edge ) ); + offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) ); } public Observable<IndexOperationMessage> handleEdgeDelete(final QueueMessage message) { @@ -431,7 +431,7 @@ public class AmazonAsyncEventService implements AsyncEventService { @Override public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) { - offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) ); + offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) ); } @Override @@ -630,7 +630,7 @@ public class AmazonAsyncEventService implements AsyncEventService { public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) { //change to id scope to avoid serialization issues - offer( new EntityIndexEvent( new EntityIdScope( applicationScope, id ), updatedSince ) ); + offer( new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, id ), updatedSince ) ); } public void indexBatch(final List<EdgeScope> edges, final long updatedSince) { @@ -638,7 +638,7 @@ public class AmazonAsyncEventService implements AsyncEventService { List batch = new ArrayList<EdgeScope>(); for ( EdgeScope e : edges){ //change to id scope to avoid serialization issues - batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince)); + batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince)); } offerBatch( batch ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/a09485a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index e9e36f0..8b44714 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -22,12 +22,12 @@ package org.apache.usergrid.corepersistence.asyncevents; import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; import org.apache.usergrid.corepersistence.index.IndexProcessorFig; -import org.apache.usergrid.corepersistence.index.IndexService; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.impl.IndexProducer; +import org.apache.usergrid.persistence.queue.QueueFig; import org.apache.usergrid.persistence.queue.QueueManagerFactory; import com.google.inject.Inject; @@ -51,6 +51,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { private final IndexLocationStrategyFactory indexLocationStrategyFactory; private final EntityIndexFactory entityIndexFactory; private final IndexProducer indexProducer; + private final QueueFig queueFig; private AsyncEventService asyncEventService; @@ -64,7 +65,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { final EventBuilder eventBuilder, final IndexLocationStrategyFactory indexLocationStrategyFactory, final EntityIndexFactory entityIndexFactory, - final IndexProducer indexProducer) { + final IndexProducer indexProducer, QueueFig queueFig) { this.indexProcessorFig = indexProcessorFig; this.queueManagerFactory = queueManagerFactory; @@ -75,6 +76,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { this.indexLocationStrategyFactory = indexLocationStrategyFactory; this.entityIndexFactory = entityIndexFactory; this.indexProducer = indexProducer; + this.queueFig = queueFig; } @@ -99,10 +101,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously()); case SQS: return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, - entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler ); + entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler,queueFig ); case SNS: return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, - entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler ); + entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler, queueFig); default: throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed"); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/a09485a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java index 6b45297..3b91278 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.usergrid.persistence.queue.QueueFig; /** @@ -47,10 +48,21 @@ public abstract class AsyncEvent implements Serializable { @JsonProperty protected long creationTime; + @JsonProperty + protected String sourceRegion; + + // Needed for jackson, do not remove + protected AsyncEvent(){ + + } //set by default, will be overridden when de-serializing - protected AsyncEvent() { + protected AsyncEvent(String sourceRegion) { + + creationTime = System.currentTimeMillis(); + this.sourceRegion = sourceRegion; + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/a09485a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java index af16bac..4bbe6f5 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java @@ -24,8 +24,6 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; public final class EdgeDeleteEvent extends AsyncEvent { @@ -39,10 +37,12 @@ public final class EdgeDeleteEvent extends AsyncEvent { public EdgeDeleteEvent() { + super(); } - public EdgeDeleteEvent( ApplicationScope applicationScope, Edge edge ) { + public EdgeDeleteEvent( String sourceRegion, ApplicationScope applicationScope, Edge edge ) { + super(sourceRegion); this.applicationScope = applicationScope; this.edge = edge; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/a09485a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java index c89b828..6164dce 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java @@ -21,15 +21,10 @@ package org.apache.usergrid.corepersistence.asyncevents.model; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; -import java.io.Serializable; - public final class EdgeIndexEvent extends AsyncEvent { @@ -48,9 +43,11 @@ public final class EdgeIndexEvent * Needed by jackson */ public EdgeIndexEvent() { + super(); } - public EdgeIndexEvent(ApplicationScope applicationScope, Id entityId, Edge edge) { + public EdgeIndexEvent(String sourceRegion, ApplicationScope applicationScope, Id entityId, Edge edge) { + super(sourceRegion); this.applicationScope = applicationScope; this.entityId = entityId; this.edge = edge; http://git-wip-us.apache.org/repos/asf/usergrid/blob/a09485a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java index 847a07d..cb3ecda 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java @@ -20,11 +20,7 @@ package org.apache.usergrid.corepersistence.asyncevents.model; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Id; public final class EntityDeleteEvent extends AsyncEvent { @@ -33,9 +29,11 @@ public final class EntityDeleteEvent extends AsyncEvent { protected EntityIdScope entityIdScope; public EntityDeleteEvent() { + super(); } - public EntityDeleteEvent(EntityIdScope entityIdScope) { + public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope) { + super(sourceRegion); this.entityIdScope = entityIdScope; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/a09485a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java index a04326a..7e8184b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java @@ -20,8 +20,6 @@ package org.apache.usergrid.corepersistence.asyncevents.model; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; @@ -35,9 +33,11 @@ public final class EntityIndexEvent extends AsyncEvent { private long updatedAfter; public EntityIndexEvent() { + super(); } - public EntityIndexEvent(EntityIdScope entityIdScope, final long updatedAfter ) { + public EntityIndexEvent(String sourceRegion, EntityIdScope entityIdScope, final long updatedAfter ) { + super(sourceRegion); this.entityIdScope = entityIdScope; this.updatedAfter = updatedAfter; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/a09485a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java index 2e69c75..1a270d4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java @@ -40,9 +40,11 @@ public class InitializeApplicationIndexEvent extends AsyncEvent { @SuppressWarnings( "unused" ) public InitializeApplicationIndexEvent(){ + super(); } - public InitializeApplicationIndexEvent(final IndexLocationStrategy indexLocationStrategy) { + public InitializeApplicationIndexEvent(String sourceRegion, final IndexLocationStrategy indexLocationStrategy) { + super(sourceRegion); this.indexLocationStrategy = indexLocationStrategy; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/a09485a3/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java index a14437c..5b921d9 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java @@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.index; import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.impl.IndexProducer; +import org.apache.usergrid.persistence.queue.QueueFig; import org.junit.Rule; import org.junit.runner.RunWith; @@ -63,6 +64,9 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest { @Inject public IndexProcessorFig indexProcessorFig; + @Inject + public QueueFig queueFig; + @Inject public MetricsFactory metricsFactory; @@ -85,7 +89,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest { @Override protected AsyncEventService getAsyncEventService() { - return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler ); + return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler, queueFig ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/a09485a3/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java deleted file mode 100644 index cb050c8..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.persistence.collection.cache; - - -import java.util.Collection; -import java.util.concurrent.TimeUnit; - -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntitySet; -import org.apache.usergrid.persistence.collection.FieldSet; -import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.collection.VersionSet; -import org.apache.usergrid.persistence.core.util.Health; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.field.Field; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import rx.Observable; -import rx.functions.Action1; - - -@Singleton -public class CachedEntityCollectionManager implements EntityCollectionManager { - - /** - * The collection manager we perform real i/o from - */ - private EntityCollectionManager targetEntityCollectionManager; - - - /** Short-term cache to keep us from reloading same Entity during single request. */ - private Cache<Id, Entity> entityCache; - - private Action1<Entity> cacheAdd = new Action1<Entity>() { - @Override - public void call( final Entity entity ) { - - entityCache.put( entity.getId(), entity ); - } - }; - - - @Inject - public CachedEntityCollectionManager( final EntityCacheFig entityCacheFig, - final EntityCollectionManager targetEntityCollectionManager ) { - this.targetEntityCollectionManager = targetEntityCollectionManager; - - - entityCache = CacheBuilder.newBuilder().maximumSize( entityCacheFig.getCacheSize() ) - .expireAfterWrite( entityCacheFig.getCacheTimeout(), TimeUnit.SECONDS ) - .build(); - } - - @Override - public Observable<FieldSet> getEntitiesFromFields( final String entityType, final Collection<Field> fields) { - return targetEntityCollectionManager.getEntitiesFromFields( entityType, fields ); - } - - @Override - public Observable<Entity> write( final Entity entity ) { - return targetEntityCollectionManager.write( entity ).doOnNext( cacheAdd ); - } - - - @Override - public Observable<Id> mark( final Id entityId ) { - return targetEntityCollectionManager.mark( entityId ).doOnNext( new Action1<Id>() { - @Override - public void call( final Id id ) { - entityCache.invalidate( id ); - } - } ); - } - - - @Override - public Observable<Entity> load( final Id entityId ) { - final Entity entity = entityCache.getIfPresent( entityId ); - - if ( entity != null ) { - return Observable.just( entity ); - } - - return targetEntityCollectionManager.load( entityId ).doOnNext( cacheAdd ); - - } - - - @Override - public Observable<VersionSet> getLatestVersion( final Collection<Id> entityId ) { - return targetEntityCollectionManager.getLatestVersion( entityId ); - } - - - - @Override - public Observable<Id> getIdField( final String entityType, final Field field ) { - return targetEntityCollectionManager.getIdField( entityType, field ); - } - - - @Override - public Observable<EntitySet> load( final Collection<Id> entityIds ) { - return targetEntityCollectionManager.load( entityIds ); - } - - - @Override - public Observable<MvccLogEntry> getVersions( final Id entityId ) { - return targetEntityCollectionManager.getVersions( entityId ); - } - - - @Override - public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) { - return targetEntityCollectionManager.delete( entries ); - } - - - @Override - public Health getHealth() { - return targetEntityCollectionManager.getHealth(); - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/a09485a3/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java index 6d8717e..45cee06 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutionException; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionManager; import org.apache.usergrid.persistence.collection.cache.EntityCacheFig; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart; @@ -92,11 +91,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag metricsFactory, serializationFig, rxTaskScheduler, scope ); -// TODO temporarily removed If we can avoid this, that would be great -// final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target ); -// -// return proxy; - return target; } } ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/a09485a3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java index 0453a9b..6f3a3dc 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java @@ -20,7 +20,7 @@ public interface QueueFig extends GuicyFig { */ @Key( "usergrid.queue.region" ) @Default("us-east-1") - String getRegion(); + String getPrimaryRegion(); /** * Flag to determine if Usergrid should use a multi-region Amazon queue http://git-wip-us.apache.org/repos/asf/usergrid/blob/a09485a3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index d476f76..5ab1a4b 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -179,8 +179,8 @@ public class SNSQueueManagerImpl implements QueueManager { final Map<String, String> arrQueueArns = new HashMap<>(regionNames.length + 1); final Map<String, String> topicArns = new HashMap<>(regionNames.length + 1); - arrQueueArns.put(primaryQueueArn, fig.getRegion()); - topicArns.put(primaryTopicArn, fig.getRegion()); + arrQueueArns.put(primaryQueueArn, fig.getPrimaryRegion()); + topicArns.put(primaryTopicArn, fig.getPrimaryRegion()); for (String regionName : regionNames) { @@ -523,7 +523,7 @@ public class SNSQueueManagerImpl implements QueueManager { * @return */ private Region getRegion() { - Regions regions = Regions.fromName(fig.getRegion()); + Regions regions = Regions.fromName(fig.getPrimaryRegion()); return Region.getRegion(regions); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/a09485a3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java index fa9a7ac..53532fc 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java @@ -323,7 +323,7 @@ public class SQSQueueManagerImpl implements QueueManager { * @return */ protected Region getRegion() { - Regions regions = Regions.fromName(fig.getRegion()); + Regions regions = Regions.fromName(fig.getPrimaryRegion()); Region region = Region.getRegion(regions); return region; }