Fixing logic around entity delete to ensure that UniqueValues and ReservationCache are both cleaned up.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e6c59ee3 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e6c59ee3 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e6c59ee3 Branch: refs/heads/apigee-sso-provider Commit: e6c59ee3fdc77ce32874ca6b2e0f5fc22a205a20 Parents: 917f0e3 Author: Dave Johnson <snoopd...@apache.org> Authored: Thu Jul 7 12:12:42 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Thu Jul 7 12:12:42 2016 -0400 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 2 +- .../resources/usergrid-custom-test.properties | 13 ++++ .../actorsystem/ActorSystemManagerImpl.java | 5 +- .../exception/WriteUniqueVerifyException.java | 2 +- .../mvcc/stage/delete/MarkCommit.java | 34 +++++++- .../mvcc/stage/write/WriteUniqueVerify.java | 3 +- .../uniquevalues/ReservationCache.java | 5 ++ .../uniquevalues/ReservationCacheActor.java | 13 +++- .../uniquevalues/UniqueValueActor.java | 81 +++++++++++--------- .../uniquevalues/UniqueValuesService.java | 15 ++++ .../uniquevalues/UniqueValuesServiceImpl.java | 34 ++++++-- .../uniquevalues/UniqueValuesTable.java | 16 ++-- .../uniquevalues/UniqueValuesTableImpl.java | 8 ++ 13 files changed, 170 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/e6c59ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 7006f44..41613ac 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -2863,7 +2863,7 @@ public class CpEntityManager implements EntityManager { throws DuplicateUniquePropertyExistsException { // we may have multiple conflicts, but caller expects only one - Map<String, Field> violiations = wuve.getVioliations(); + Map<String, Field> violiations = wuve.getViolations(); if ( violiations != null ) { Field conflict = violiations.get( violiations.keySet().iterator().next() ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/e6c59ee3/stack/core/src/test/resources/usergrid-custom-test.properties ---------------------------------------------------------------------- diff --git a/stack/core/src/test/resources/usergrid-custom-test.properties b/stack/core/src/test/resources/usergrid-custom-test.properties index 0fa9573..dd6612f 100644 --- a/stack/core/src/test/resources/usergrid-custom-test.properties +++ b/stack/core/src/test/resources/usergrid-custom-test.properties @@ -34,3 +34,16 @@ elasticsearch.buffer_timeout=1 # This property is required to be set and cannot be defaulted anywhere usergrid.cluster_name=usergrid +# --- Start: Usergrid cluster/actor system settings + +usergrid.cluster.enabled=true +usergrid.cluster.hostname=localhost +usergrid.cluster.region.local=us-east +usergrid.cluster.region.list=us-east +usergrid.cluster.seeds=us-east\:localhost + +collection.uniquevalues.actors=300 +collection.uniquevalues.authoritative.region=us-east + +# --- End: Usergrid cluster/actor system settings + http://git-wip-us.apache.org/repos/asf/usergrid/blob/e6c59ee3/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java index 099b875..8399979 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java @@ -282,12 +282,13 @@ public class ActorSystemManagerImpl implements ActorSystemManager { String region = currentRegion; List<String> seeds = getSeedsByRegion().get( region ); - int lastColon = seeds.get(0).lastIndexOf(":") + 1; - final Integer regionPort = Integer.parseInt( seeds.get(0).substring( lastColon )); logger.info( "Akka Config for region {} is:\n" + " Hostname {}\n" + " Seeds {}\n", region, hostname, seeds ); + int lastColon = seeds.get(0).lastIndexOf(":") + 1; + final Integer regionPort = Integer.parseInt( seeds.get(0).substring( lastColon )); + Map<String, Object> configMap = new HashMap<String, Object>() {{ put( "akka", new HashMap<String, Object>() {{ http://git-wip-us.apache.org/repos/asf/usergrid/blob/e6c59ee3/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java index 81165fa..a223a9e 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java @@ -40,7 +40,7 @@ public class WriteUniqueVerifyException extends CollectionRuntimeException { /** * Get map of Fields in violation, keyed by field name. */ - public Map<String, Field> getVioliations() { + public Map<String, Field> getViolations() { return violations; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/e6c59ee3/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java index 460c271..70c785e 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java @@ -21,6 +21,10 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.delete; import java.util.UUID; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException; +import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesFig; +import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,12 +64,19 @@ public class MarkCommit implements Action1<CollectionIoEvent<MvccEntity>> { private final SerializationFig serializationFig; private final UniqueValueSerializationStrategy uniqueValueStrat; private final Keyspace keyspace; + private final ActorSystemFig actorSystemFig; + private final UniqueValuesFig uniqueValuesFig; + private final UniqueValuesService uniqueValuesService; @Inject public MarkCommit( final MvccLogEntrySerializationStrategy logStrat, final MvccEntitySerializationStrategy entityStrat, - final UniqueValueSerializationStrategy uniqueValueStrat, final SerializationFig serializationFig, + final UniqueValueSerializationStrategy uniqueValueStrat, + final SerializationFig serializationFig, + final ActorSystemFig actorSystemFig, + final UniqueValuesFig uniqueValuesFig, + final UniqueValuesService uniqueValuesService, final Keyspace keyspace ) { @@ -76,6 +87,9 @@ public class MarkCommit implements Action1<CollectionIoEvent<MvccEntity>> { this.entityStrat = entityStrat; this.serializationFig = serializationFig; this.uniqueValueStrat = uniqueValueStrat; + this.actorSystemFig = actorSystemFig; + this.uniqueValuesFig = uniqueValuesFig; + this.uniqueValuesService = uniqueValuesService; this.keyspace = keyspace; } @@ -111,6 +125,24 @@ public class MarkCommit implements Action1<CollectionIoEvent<MvccEntity>> { catch ( ConnectionException e ) { throw new RuntimeException( "Unable to mark entry as deleted" ); } + + // actorSystemFig may be null in testing + if ( actorSystemFig != null && actorSystemFig.getEnabled() ) { + + String region = idIoEvent.getRegion(); + if ( region == null ) { + region = uniqueValuesFig.getAuthoritativeRegion(); + } + if ( region == null ) { + region = actorSystemFig.getRegionLocal(); + } + + try { + uniqueValuesService.releaseUniqueValues( applicationScope, entityId, version, region ); + } catch (UniqueValueException e) { + throw new RuntimeException( "Unable to release unique values for entity " + entityId ); + } + } } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/e6c59ee3/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java index e7ba967..1f86440 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java @@ -219,8 +219,7 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> logger.trace("Pre-write unique violations found, raising exception before executing first write"); } - throw new WriteUniqueVerifyException(mvccEntity, scope, - preWriteUniquenessViolations ); + throw new WriteUniqueVerifyException(mvccEntity, scope, preWriteUniquenessViolations ); } //short circuit nothing to do http://git-wip-us.apache.org/repos/asf/usergrid/blob/e6c59ee3/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java index f1e68b2..8e2e51e 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java @@ -72,6 +72,11 @@ public class ReservationCache { cache.invalidate( cancellation.getConsistentHashKey() ); } + public void cancelReservation( UniqueValueActor.Response response ) { + if ( ttl == 0 ) { return; } + cache.invalidate( response.getConsistentHashKey() ); + } + public CacheStats getStats() { return cache.stats(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/e6c59ee3/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java index 51f5c8c..158b099 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCacheActor.java @@ -54,12 +54,19 @@ public class ReservationCacheActor extends UntypedActor { } } else if ( msg instanceof UniqueValueActor.Cancellation ) { - UniqueValueActor.Cancellation can = (UniqueValueActor.Cancellation)msg; + UniqueValueActor.Cancellation can = (UniqueValueActor.Cancellation) msg; ReservationCache.getInstance().cancelReservation( can ); - if ( ++cancellationCount % 10 == 0 ) { - logger.info("Received {} cancellations", cancellationCount); + if (++cancellationCount % 10 == 0) { + logger.info( "Received {} cancellations", cancellationCount ); } + logger.debug("Removing cancelled {} from reservation cache", can.getConsistentHashKey()); + + } else if ( msg instanceof UniqueValueActor.Response ) { + UniqueValueActor.Response response = (UniqueValueActor.Response) msg; + ReservationCache.getInstance().cancelReservation( response ); + + logger.info("Removing completed {} from reservation cache", response.getConsistentHashKey()); } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) { logger.debug( "subscribing" ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/e6c59ee3/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java index a14c63e..501037f 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java @@ -16,10 +16,7 @@ */ package org.apache.usergrid.persistence.collection.uniquevalues; -import akka.actor.ActorRef; import akka.actor.UntypedActor; -import akka.cluster.pubsub.DistributedPubSub; -import akka.cluster.pubsub.DistributedPubSubMediator; import org.apache.commons.lang3.RandomStringUtils; import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -51,8 +48,6 @@ public class UniqueValueActor extends UntypedActor { // TODO: is there a way to avoid this ugly kludge? see also: ClusterSingletonRouter this.table = UniqueValuesServiceImpl.injector.getInstance( UniqueValuesTable.class ); this.actorSystemManager = UniqueValuesServiceImpl.injector.getInstance( ActorSystemManager.class ); - - //logger.info("UniqueValueActor {} is live with table {}", name, table); } @Override @@ -77,24 +72,27 @@ public class UniqueValueActor extends UntypedActor { if ( owner != null && owner.equals( res.getOwner() )) { // sender already owns this unique value - getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() ); + getSender().tell( new Response( Response.Status.IS_UNIQUE, res.getConsistentHashKey() ), + getSender() ); return; } else if ( owner != null && !owner.equals( res.getOwner() )) { // tell sender value is not unique - getSender().tell( new Response( Response.Status.NOT_UNIQUE ), getSender() ); + getSender().tell( new Response( Response.Status.NOT_UNIQUE, res.getConsistentHashKey() ), + getSender() ); return; } table.reserve( res.getApplicationScope(), res.getOwner(), res.getOwnerVersion(), res.getField() ); - getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() ); + getSender().tell( new Response( Response.Status.IS_UNIQUE, res.getConsistentHashKey() ), + getSender() ); actorSystemManager.publishToAllRegions( "content", new Reservation( res ), getSelf() ); } catch (Throwable t) { - getSender().tell( new Response( Response.Status.ERROR ), getSender() ); + getSender().tell( new Response( Response.Status.ERROR, res.getConsistentHashKey() ), getSender() ); logger.error( "Error processing request", t ); @@ -112,23 +110,29 @@ public class UniqueValueActor extends UntypedActor { if ( owner != null && !owner.equals( con.getOwner() )) { // cannot reserve, somebody else owns the unique value - getSender().tell( new Response( Response.Status.NOT_UNIQUE ), getSender() ); + Response response = new Response( Response.Status.NOT_UNIQUE, con.getConsistentHashKey()); + getSender().tell( response, getSender() ); + actorSystemManager.publishToAllRegions( "content", response, getSelf() ); return; } else if ( owner == null ) { // cannot commit without first reserving - getSender().tell( new Response( Response.Status.BAD_REQUEST ), getSender() ); + Response response = new Response( Response.Status.BAD_REQUEST, con.getConsistentHashKey()); + getSender().tell( response, getSender() ); + actorSystemManager.publishToAllRegions( "content", response, getSelf() ); return; } table.confirm( con.getApplicationScope(), con.getOwner(), con.getOwnerVersion(), con.getField() ); - getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() ); + Response response = new Response( Response.Status.IS_UNIQUE, con.getConsistentHashKey() ); + getSender().tell( response, getSender() ); - actorSystemManager.publishToAllRegions( "content", new Reservation( con ), getSelf() ); + actorSystemManager.publishToAllRegions( "content", response, getSelf() ); } catch (Throwable t) { - getSender().tell( new Response( Response.Status.ERROR ), getSender() ); + getSender().tell( new Response( Response.Status.ERROR, con.getConsistentHashKey() ), + getSender() ); logger.error( "Error processing request", t ); } finally { @@ -144,23 +148,34 @@ public class UniqueValueActor extends UntypedActor { if ( owner != null && !owner.equals( can.getOwner() )) { // cannot cancel, somebody else owns the unique value - getSender().tell( new Response( Response.Status.NOT_UNIQUE ), getSender() ); + getSender().tell( new Response( Response.Status.NOT_UNIQUE, can.getConsistentHashKey() ), + getSender() ); return; } else if ( owner == null ) { + // cannot cancel unique value that does not exist - getSender().tell( new Response( Response.Status.BAD_REQUEST ), getSender() ); + getSender().tell( new Response( Response.Status.BAD_REQUEST, can.getConsistentHashKey() ), + getSender() ); + + // unique value record may have already been cleaned up, also clear cache + actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() ); + return; } - table.confirm( can.getApplicationScope(), can.getOwner(), can.getOwnerVersion(), can.getField() ); + table.cancel( can.getApplicationScope(), can.getOwner(), can.getOwnerVersion(), can.getField() ); + + logger.debug("Removing {} from unique values table", can.getConsistentHashKey()); - getSender().tell( new Response( Response.Status.SUCCESS ), getSender() ); + getSender().tell( new Response( Response.Status.SUCCESS, can.getConsistentHashKey() ), + getSender() ); - actorSystemManager.publishToAllRegions( "content", new Reservation( can ), getSelf() ); + actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() ); } catch (Throwable t) { - getSender().tell( new Response( Response.Status.ERROR ), getSender() ); + getSender().tell( new Response( Response.Status.ERROR, can.getConsistentHashKey() ), + getSender() ); logger.error( "Error processing request", t ); } @@ -188,7 +203,7 @@ public class UniqueValueActor extends UntypedActor { this.field = field; StringBuilder sb = new StringBuilder(); - sb.append( applicationScope.getApplication() ); + sb.append( applicationScope.getApplication().getUuid() ); sb.append(":"); sb.append( owner.getType() ); sb.append(":"); @@ -198,22 +213,7 @@ public class UniqueValueActor extends UntypedActor { this.consistentHashKey = sb.toString(); } public Request( Request req ) { - - this.applicationScope = req.applicationScope; - this.owner = req.owner; - this.ownerVersion = req.ownerVersion; - this.field = req.field; - - StringBuilder sb = new StringBuilder(); - sb.append( req.applicationScope.getApplication() ); - sb.append(":"); - sb.append( req.owner.getType() ); - sb.append(":"); - sb.append( req.field.getName() ); - sb.append(":"); - sb.append( req.field.getValue().toString() ); - this.consistentHashKey = sb.toString(); - + this( req.getApplicationScope(), req.getOwner(), req.getOwnerVersion(), req.getField() ); } public ApplicationScope getApplicationScope() { return applicationScope; @@ -238,13 +238,18 @@ public class UniqueValueActor extends UntypedActor { public static class Response implements Serializable { public enum Status { IS_UNIQUE, NOT_UNIQUE, SUCCESS, ERROR, BAD_REQUEST } final Status status; + final String consistentHashKey; - public Response(Status status) { + public Response(Status status, String consistentHashKey ) { this.status = status; + this.consistentHashKey = consistentHashKey; } public Status getStatus() { return status; } + public String getConsistentHashKey() { + return consistentHashKey; + } } public static class Reservation extends Request implements Serializable { http://git-wip-us.apache.org/repos/asf/usergrid/blob/e6c59ee3/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java index e9a9f50..5deddd6 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java @@ -20,8 +20,11 @@ package org.apache.usergrid.persistence.collection.uniquevalues; import org.apache.usergrid.persistence.actorsystem.RouterProducer; +import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.field.Field; import java.util.UUID; @@ -55,4 +58,16 @@ public interface UniqueValuesService extends RouterProducer { */ void confirmUniqueValues( ApplicationScope scope, Entity entity, UUID version , String region ) throws UniqueValueException; + + /** + * Release unique values held by an entity. + * + * @param scope Application scope of entity. + * @param entityId Id of Entity with unique values to be released + * @param version Version of entity. + * @param region Authoritative Region to be used for this entity or null to use current region. + * @throws UniqueValueException if unique values cannot be reserved. + */ + void releaseUniqueValues( ApplicationScope scope, Id entityId, UUID version, String region ) + throws UniqueValueException; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/e6c59ee3/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java index fcee8fa..4102090 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java @@ -35,8 +35,10 @@ import com.google.inject.Singleton; import org.apache.commons.lang3.StringUtils; import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; +import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.field.Field; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +46,7 @@ import scala.concurrent.Await; import scala.concurrent.Future; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -109,13 +112,12 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { try { cancelUniqueField( scope, entity, version, field, region ); } catch (Throwable ignored) { - logger.debug( "Error canceling unique field", ignored ); + logger.error( "Error canceling unique field", ignored ); } } } throw e; } - } @@ -149,6 +151,21 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } + @Override + public void releaseUniqueValues(ApplicationScope scope, Id entityId, UUID version, String region) + throws UniqueValueException { + + ready(); + + Iterator<UniqueValue> iterator = table.getUniqueValues( scope, entityId ); + + while ( iterator.hasNext() ) { + UniqueValue uniqueValue = iterator.next(); + cancelUniqueField( scope, entityId, uniqueValue.getEntityVersion(), uniqueValue.getField(), region ); + } + } + + private void reserveUniqueField( ApplicationScope scope, Entity entity, UUID version, Field field, String region ) throws UniqueValueException { @@ -177,11 +194,18 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } - private void cancelUniqueField( - ApplicationScope scope, Entity entity, UUID version, Field field, String region ) throws UniqueValueException { + private void cancelUniqueField( ApplicationScope scope, + Entity entity, UUID version, Field field, String region ) throws UniqueValueException { + + cancelUniqueField( scope, entity.getId(), version, field, region ); + } + + + private void cancelUniqueField( ApplicationScope scope, + Id entityId, UUID version, Field field, String region ) throws UniqueValueException { UniqueValueActor.Cancellation request = new UniqueValueActor.Cancellation( - scope, entity.getId(), version, field ); + scope, entityId, version, field ); if ( actorSystemManager.getCurrentRegion().equals( region ) ) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/e6c59ee3/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java index 0e69ef7..1bb2534 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java @@ -19,24 +19,24 @@ package org.apache.usergrid.persistence.collection.uniquevalues; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.field.Field; +import java.util.Iterator; import java.util.UUID; public interface UniqueValuesTable { - Id lookupOwner( - ApplicationScope applicationScope, String type, Field field ) throws ConnectionException; + Id lookupOwner( ApplicationScope scope, String type, Field field ) throws ConnectionException; - void reserve( - ApplicationScope applicationScope, Id owner, UUID version, Field field ) throws ConnectionException; + void reserve( ApplicationScope scope, Id owner, UUID version, Field field ) throws ConnectionException; - void confirm( - ApplicationScope applicationScope, Id owner, UUID version, Field field ) throws ConnectionException; + void confirm( ApplicationScope scope, Id owner, UUID version, Field field ) throws ConnectionException; - void cancel( - ApplicationScope applicationScope, Id owner, UUID version, Field field ) throws ConnectionException; + void cancel( ApplicationScope scope, Id owner, UUID version, Field field ) throws ConnectionException; + + Iterator<UniqueValue> getUniqueValues(ApplicationScope scope, Id entityId ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/e6c59ee3/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java index 9cb13be..2cad32c 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java @@ -33,7 +33,9 @@ import org.apache.usergrid.persistence.model.field.Field; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.UUID; @@ -83,4 +85,10 @@ public class UniqueValuesTableImpl implements UniqueValuesTable { final MutationBatch write = strat.delete( scope, uv ); write.execute(); } + + @Override + public Iterator<UniqueValue> getUniqueValues(ApplicationScope scope, Id entityId) { + return strat.getAllUniqueFields( scope, entityId ); + } + }