Merge branch 'release-2.1.1' of https://git-wip-us.apache.org/repos/asf/usergrid into usergrid-1268-akka-211
# Conflicts: # stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java # stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/37972a2e Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/37972a2e Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/37972a2e Branch: refs/heads/datastax-cass-driver Commit: 37972a2ed8c21c144f7e56d76afef37775f5012d Parents: e6c59ee 86ae2cd Author: Dave Johnson <snoopd...@apache.org> Authored: Thu Jul 7 12:30:10 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Thu Jul 7 12:30:10 2016 -0400 ---------------------------------------------------------------------- .../EntityCollectionManagerFactoryImpl.java | 103 ++++++++--------- .../impl/EntityCollectionManagerImpl.java | 112 ++++++++----------- .../mvcc/stage/write/WriteUniqueVerify.java | 4 +- 3 files changed, 103 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/37972a2e/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java index 3877fe3,01eab7d..fcaa51d --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java @@@ -19,38 -21,37 +19,33 @@@ package org.apache.usergrid.persistence.collection.impl; --import java.util.concurrent.ExecutionException; -- ++import com.google.common.base.Preconditions; ++import com.google.common.cache.CacheBuilder; ++import com.google.common.cache.CacheLoader; ++import com.google.common.cache.LoadingCache; ++import com.google.inject.Inject; ++import com.google.inject.Singleton; ++import com.netflix.astyanax.Keyspace; +import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; --import org.apache.usergrid.persistence.collection.cache.EntityCacheFig; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.UniqueCleanup; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact; --import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction; --import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit; --import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify; --import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart; --import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify; ++import org.apache.usergrid.persistence.collection.mvcc.stage.write.*; import org.apache.usergrid.persistence.collection.scheduler.CollectionExecutorScheduler; import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; +import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; + import org.apache.usergrid.persistence.core.astyanax.CassandraConfig; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; --import com.google.common.base.Preconditions; --import com.google.common.cache.CacheBuilder; --import com.google.common.cache.CacheLoader; --import com.google.common.cache.LoadingCache; --import com.google.inject.Inject; --import com.google.inject.Singleton; --import com.netflix.astyanax.Keyspace; ++import java.util.concurrent.ExecutionException; @@@ -60,7 -61,7 +55,6 @@@ @Singleton public class EntityCollectionManagerFactoryImpl implements EntityCollectionManagerFactory { -- private final WriteStart writeStart; private final WriteUniqueVerify writeVerifyUnique; private final WriteOptimisticVerify writeOptimisticVerify; @@@ -77,35 -78,20 +71,40 @@@ private final Keyspace keyspace; private final MetricsFactory metricsFactory; private final RxTaskScheduler rxTaskScheduler; + private final ActorSystemManager actorSystemManager; + private final UniqueValuesService uniqueValuesService; + + private final CassandraConfig cassandraConfig; private LoadingCache<ApplicationScope, EntityCollectionManager> ecmCache = CacheBuilder.newBuilder().maximumSize( 1000 ) .build( new CacheLoader<ApplicationScope, EntityCollectionManager>() { public EntityCollectionManager load( ApplicationScope scope ) { - //create the target EM that will perform logic + //create the target EM that will perform logic final EntityCollectionManager target = new EntityCollectionManagerImpl( - writeStart, writeVerifyUnique, - writeOptimisticVerify, writeCommit, rollback, markStart, markCommit, uniqueCleanup, versionCompact, - entitySerializationStrategy, uniqueValueSerializationStrategy, - mvccLogEntrySerializationStrategy, keyspace, - metricsFactory, serializationFig, - rxTaskScheduler, scope, cassandraConfig ); ++ + writeStart, + writeVerifyUnique, + writeOptimisticVerify, + writeCommit, + rollback, + markStart, + markCommit, + uniqueCleanup, + versionCompact, ++ + entitySerializationStrategy, + uniqueValueSerializationStrategy, + mvccLogEntrySerializationStrategy, ++ + keyspace, + metricsFactory, + serializationFig, + rxTaskScheduler, + actorSystemManager, + uniqueValuesService, ++ cassandraConfig, + scope ); return target; } @@@ -113,47 -99,38 +112,49 @@@ @Inject - public EntityCollectionManagerFactoryImpl( final WriteStart writeStart, final WriteUniqueVerify writeVerifyUnique, - final WriteOptimisticVerify writeOptimisticVerify, - final WriteCommit writeCommit, final RollbackAction rollback, - final MarkStart markStart, final MarkCommit markCommit, - final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact, - final SerializationFig serializationFig, final - MvccEntitySerializationStrategy entitySerializationStrategy, - final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, - final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, - final Keyspace keyspace, final EntityCacheFig entityCacheFig, - final MetricsFactory metricsFactory, - @CollectionExecutorScheduler final RxTaskScheduler rxTaskScheduler, - final CassandraConfig cassandraConfig) { - - this.writeStart = writeStart; - this.writeVerifyUnique = writeVerifyUnique; - this.writeOptimisticVerify = writeOptimisticVerify; - this.writeCommit = writeCommit; - this.rollback = rollback; - this.markStart = markStart; - this.markCommit = markCommit; - this.uniqueCleanup = uniqueCleanup; - this.versionCompact = versionCompact; - this.serializationFig = serializationFig; - this.entitySerializationStrategy = entitySerializationStrategy; - this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; + public EntityCollectionManagerFactoryImpl( - final WriteStart writeStart, - final WriteUniqueVerify writeVerifyUnique, ++ final WriteStart writeStart, ++ final WriteUniqueVerify writeVerifyUnique, + final WriteOptimisticVerify writeOptimisticVerify, - final WriteCommit writeCommit, - final RollbackAction rollback, - final MarkStart markStart, - final MarkCommit markCommit, - final UniqueCleanup uniqueCleanup, - final VersionCompact versionCompact, - final SerializationFig serializationFig, - final MvccEntitySerializationStrategy entitySerializationStrategy, - final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, ++ final WriteCommit writeCommit, ++ final RollbackAction rollback, ++ final MarkStart markStart, ++ final MarkCommit markCommit, ++ final UniqueCleanup uniqueCleanup, ++ final VersionCompact versionCompact, ++ final SerializationFig serializationFig, ++ final MvccEntitySerializationStrategy entitySerializationStrategy, ++ final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, + final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, - final Keyspace keyspace, - final EntityCacheFig entityCacheFig, - final MetricsFactory metricsFactory, @CollectionExecutorScheduler - final RxTaskScheduler rxTaskScheduler, - final ActorSystemManager actorSystemManager, - final UniqueValuesService uniqueValuesService ) { - - this.writeStart = writeStart; - this.writeVerifyUnique = writeVerifyUnique; - this.writeOptimisticVerify = writeOptimisticVerify; - this.writeCommit = writeCommit; - this.rollback = rollback; - this.markStart = markStart; - this.markCommit = markCommit; - this.uniqueCleanup = uniqueCleanup; - this.versionCompact = versionCompact; - this.serializationFig = serializationFig; - this.entitySerializationStrategy = entitySerializationStrategy; - this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; ++ final Keyspace keyspace, ++ final MetricsFactory metricsFactory, ++ @CollectionExecutorScheduler ++ final RxTaskScheduler rxTaskScheduler, ++ final ActorSystemManager actorSystemManager, ++ final UniqueValuesService uniqueValuesService, ++ final CassandraConfig cassandraConfig ) { ++ ++ this.writeStart = writeStart; ++ this.writeVerifyUnique = writeVerifyUnique; ++ this.writeOptimisticVerify = writeOptimisticVerify; ++ this.writeCommit = writeCommit; ++ this.rollback = rollback; ++ this.markStart = markStart; ++ this.markCommit = markCommit; ++ this.uniqueCleanup = uniqueCleanup; ++ this.versionCompact = versionCompact; ++ this.serializationFig = serializationFig; ++ this.entitySerializationStrategy = entitySerializationStrategy; ++ this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy; -- this.keyspace = keyspace; -- this.metricsFactory = metricsFactory; -- this.rxTaskScheduler = rxTaskScheduler; - this.actorSystemManager = actorSystemManager; - this.uniqueValuesService = uniqueValuesService; - this.cassandraConfig = cassandraConfig; ++ this.keyspace = keyspace; ++ this.metricsFactory = metricsFactory; ++ this.rxTaskScheduler = rxTaskScheduler; ++ this.actorSystemManager = actorSystemManager; ++ this.uniqueValuesService = uniqueValuesService; ++ this.cassandraConfig = cassandraConfig; } + @Override public EntityCollectionManager createCollectionManager(ApplicationScope applicationScope) { Preconditions.checkNotNull(applicationScope); http://git-wip-us.apache.org/repos/asf/usergrid/blob/37972a2e/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java index 6a04c7c,06b086c..9dce7ef --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java @@@ -19,44 -19,43 +19,32 @@@ package org.apache.usergrid.persistence.collection.impl; --import java.util.ArrayList; --import java.util.Collection; --import java.util.Collections; --import java.util.Iterator; --import java.util.List; --import java.util.UUID; -- ++import com.codahale.metrics.Timer; ++import com.google.common.base.Preconditions; ++import com.google.inject.Inject; ++import com.google.inject.assistedinject.Assisted; ++import com.netflix.astyanax.Keyspace; ++import com.netflix.astyanax.MutationBatch; ++import com.netflix.astyanax.connectionpool.OperationResult; ++import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; ++import com.netflix.astyanax.model.ColumnFamily; import com.netflix.astyanax.model.ConsistencyLevel; -import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator; -import org.apache.usergrid.persistence.core.astyanax.CassandraConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntitySet; -import org.apache.usergrid.persistence.collection.FieldSet; -import org.apache.usergrid.persistence.collection.MvccEntity; -import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.collection.VersionSet; ++import com.netflix.astyanax.model.CqlResult; ++import com.netflix.astyanax.serializers.StringSerializer; +import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; - import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; - import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator; - import org.slf4j.Logger; - import org.slf4j.LoggerFactory; - - import org.apache.usergrid.persistence.collection.EntityCollectionManager; - import org.apache.usergrid.persistence.collection.EntitySet; - import org.apache.usergrid.persistence.collection.FieldSet; - import org.apache.usergrid.persistence.collection.MvccEntity; - import org.apache.usergrid.persistence.collection.MvccLogEntry; - import org.apache.usergrid.persistence.collection.VersionSet; ++import org.apache.usergrid.persistence.collection.*; import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.UniqueCleanup; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact; --import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction; --import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit; --import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify; --import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart; --import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify; --import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy; --import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; --import org.apache.usergrid.persistence.collection.serialization.SerializationFig; --import org.apache.usergrid.persistence.collection.serialization.UniqueValue; --import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; --import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; ++import org.apache.usergrid.persistence.collection.mvcc.stage.write.*; ++import org.apache.usergrid.persistence.collection.serialization.*; ++import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator; import org.apache.usergrid.persistence.collection.serialization.impl.MinMaxLogEntryIterator; import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet; ++import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; ++import org.apache.usergrid.persistence.core.astyanax.CassandraConfig; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.metrics.ObservableTimer; import org.apache.usergrid.persistence.core.rx.ObservableIterator; @@@ -68,22 -67,22 +56,13 @@@ import org.apache.usergrid.persistence. import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.field.Field; import org.apache.usergrid.persistence.model.util.UUIDGenerator; -- --import com.codahale.metrics.Timer; --import com.google.common.base.Preconditions; --import com.google.inject.Inject; --import com.google.inject.assistedinject.Assisted; --import com.netflix.astyanax.Keyspace; --import com.netflix.astyanax.MutationBatch; --import com.netflix.astyanax.connectionpool.OperationResult; --import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; --import com.netflix.astyanax.model.ColumnFamily; --import com.netflix.astyanax.model.CqlResult; --import com.netflix.astyanax.serializers.StringSerializer; -- ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; import rx.Observable; import rx.Subscriber; ++import java.util.*; ++ /** * Simple implementation. Should perform writes, delete and load. @@@ -125,31 -125,20 +105,35 @@@ public class EntityCollectionManagerImp private final ApplicationScope applicationScope; private final RxTaskScheduler rxTaskScheduler; + private final UniqueValuesService uniqueValuesService; + private final ActorSystemManager actorSystemManager; + @Inject - public EntityCollectionManagerImpl( final WriteStart writeStart, - final WriteUniqueVerify writeVerifyUnique, - public EntityCollectionManagerImpl( final WriteStart writeStart, final WriteUniqueVerify writeVerifyUnique, -- final WriteOptimisticVerify writeOptimisticVerify, - final WriteCommit writeCommit, - final RollbackAction rollback, - final MarkStart markStart, - final MarkCommit markCommit, - final UniqueCleanup uniqueCleanup, - final VersionCompact versionCompact, - final WriteCommit writeCommit, final RollbackAction rollback, - final MarkStart markStart, final MarkCommit markCommit, - final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact, -- final MvccEntitySerializationStrategy entitySerializationStrategy, -- final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, -- final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, - final Keyspace keyspace, - final MetricsFactory metricsFactory, - final SerializationFig serializationFig, - final RxTaskScheduler rxTaskScheduler, - ActorSystemManager actorSystemManager, - UniqueValuesService uniqueValuesService, - @Assisted final ApplicationScope applicationScope ) { - final Keyspace keyspace, final MetricsFactory metricsFactory, - final SerializationFig serializationFig, final RxTaskScheduler rxTaskScheduler, - @Assisted final ApplicationScope applicationScope, - final CassandraConfig cassandraConfig) { ++ public EntityCollectionManagerImpl( ++ final WriteStart writeStart, ++ final WriteUniqueVerify writeVerifyUnique, ++ final WriteOptimisticVerify writeOptimisticVerify, ++ final WriteCommit writeCommit, ++ final RollbackAction rollback, ++ final MarkStart markStart, ++ final MarkCommit markCommit, ++ final UniqueCleanup uniqueCleanup, ++ final VersionCompact versionCompact, ++ ++ final MvccEntitySerializationStrategy entitySerializationStrategy, ++ final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, ++ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, ++ ++ final Keyspace keyspace, ++ final MetricsFactory metricsFactory, ++ final SerializationFig serializationFig, ++ final RxTaskScheduler rxTaskScheduler, ++ final ActorSystemManager actorSystemManager, ++ final UniqueValuesService uniqueValuesService, ++ final CassandraConfig cassandraConfig, ++ @Assisted final ApplicationScope applicationScope ) { + this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.entitySerializationStrategy = entitySerializationStrategy; this.uniqueCleanup = uniqueCleanup; http://git-wip-us.apache.org/repos/asf/usergrid/blob/37972a2e/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java index 1f86440,b36cb79..a3565ea --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java @@@ -170,11 -121,13 +170,11 @@@ public class WriteUniqueVerify implemen try { - // loading will retrieve the oldest unique value entry for the field // don't use read repair on this pre-write check - // use CL ALL as consistency is extremely important here, more so than performance - UniqueValueSet set = uniqueValueStrat.load(scope, ConsistencyLevel.CL_ALL, + // stronger consistency is extremely important here, more so than performance + UniqueValueSet set = uniqueValueStrat.load(scope, cassandraFig.getConsistentReadCL(), written.getEntityId().getType(), Collections.singletonList(written.getField()), false); - set.forEach(uniqueValue -> { if(!uniqueValue.getEntityId().getUuid().equals(written.getEntityId().getUuid())){