Repository: usergrid Updated Branches: refs/heads/USERGRID-909 817d7ffb4 -> 06d34dbce
Adds delta to avoid accidentally deleting a shard as it's allocated Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/06d34dbc Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/06d34dbc Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/06d34dbc Branch: refs/heads/USERGRID-909 Commit: 06d34dbce75b2868649aea3b44c651ecd3799d30 Parents: 817d7ff Author: Todd Nine <tn...@apigee.com> Authored: Tue Nov 3 13:47:52 2015 -0700 Committer: Todd Nine <tn...@apigee.com> Committed: Tue Nov 3 13:47:52 2015 -0700 ---------------------------------------------------------------------- .../impl/ScopedCacheSerializationImpl.java | 29 ++++-------- .../usergrid/persistence/graph/GraphFig.java | 20 ++++++-- .../persistence/graph/guice/GraphModule.java | 2 + .../impl/shard/ShardEntryGroup.java | 18 +++++-- .../shard/impl/NodeShardAllocationImpl.java | 49 +++++++++----------- .../shard/impl/ShardEntryGroupIterator.java | 6 ++- .../graph/GraphManagerShardConsistencyIT.java | 37 ++++++--------- .../impl/shard/NodeShardAllocationTest.java | 12 +++-- .../impl/shard/NodeShardGroupSearchTest.java | 14 +++--- .../impl/shard/ShardEntryGroupTest.java | 47 +++++++------------ .../shard/impl/ShardEntryGroupIteratorTest.java | 6 +-- stack/corepersistence/locks/pom.xml | 8 ++-- 12 files changed, 121 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java index 1439bc5..bed84c6 100644 --- a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java +++ b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.hash.Funnel; import com.google.common.hash.PrimitiveSink; @@ -79,13 +80,15 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati private static final int[] NUM_BUCKETS = {20}; /** How to funnel keys for buckets */ - private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() { + private static final Funnel<String> MAP_KEY_FUNNEL = + ( Funnel<String> ) ( key, into ) -> into.putString(key, StringHashUtils.UTF8 ); + + + /** + * One second gc grace since our columns expire + */ + private static final int GC_GRACE = 1; - @Override - public void funnel( final String key, final PrimitiveSink into ) { - into.putString(key, StringHashUtils.UTF8); - } - }; /** * Locator to get us all buckets @@ -252,18 +255,6 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati } - private class MutationBatchExec implements Callable<Void> { - private final MutationBatch myBatch; - private MutationBatchExec(MutationBatch batch) { - myBatch = batch; - } - @Override - public Void call() throws Exception { - myBatch.execute(); - return null; - } - } - private OperationResult<Void> executeBatch(MutationBatch batch) { try { @@ -284,7 +275,7 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati BytesType.class.getSimpleName(), BytesType.class.getSimpleName(), BytesType.class.getSimpleName(), - MultiTennantColumnFamilyDefinition.CacheOption.KEYS ); + MultiTennantColumnFamilyDefinition.CacheOption.KEYS, Optional.of(GC_GRACE) ); return Arrays.asList(scopedCache); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java index 82dfe51..b209835 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java @@ -43,7 +43,6 @@ public interface GraphFig extends GuicyFig { * * You will want to set this value to no more than 2x tombstone_failure_threshold to avoid failures on read during * shard compaction. - * */ String SHARD_SIZE = "usergrid.graph.shard.size"; @@ -68,6 +67,14 @@ public interface GraphFig extends GuicyFig { String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size"; + /** + * The minimum amount of time than can occur (in millis) between shard allocation and deletion. + * + * Note that you should also pad this for node clock drift. A good value for this would be 60 seconds, assuming you + * have NTP and your nodes are reasonably (< 1 second) synced + */ + String SHARD_DELETE_DELTA = "usergrid.graph.shard.delete.delta"; + @Default( "1000" ) @Key( SCAN_PAGE_SIZE ) @@ -80,9 +87,9 @@ public interface GraphFig extends GuicyFig { /** - * A 1% repair chance. On average we'll check to repair on 1 out of every 100 reads + * A 2% repair chance. On average we'll check to repair on 2 out of every 100 reads */ - @Default( ".01" ) + @Default( ".02" ) @Key( SHARD_REPAIR_CHANCE ) double getShardRepairChance(); @@ -104,6 +111,11 @@ public interface GraphFig extends GuicyFig { @Key( COUNTER_WRITE_FLUSH_QUEUE_SIZE ) int getCounterFlushQueueSize(); + @Default( "60000" ) + @Key( SHARD_DELETE_DELTA ) + long getShardDeleteDelta(); + + @Default( "CL_EACH_QUORUM" ) @Key( SHARD_WRITE_CONSISTENCY ) String getShardWriteConsistency(); @@ -114,7 +126,5 @@ public interface GraphFig extends GuicyFig { @Default( "CL_LOCAL_QUORUM" ) @Key( SHARD_READ_CONSISTENCY ) String getShardReadConsistency(); - - } http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java index 752f417..067f33a 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java @@ -64,10 +64,12 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroup import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion; import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization; import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerializationImpl; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.AsyncTaskExecutorImpl; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardSerializationImpl; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardGroupSearchImpl; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupDeletionImpl; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeColumnFamilies; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy; http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java index c33e0d8..1379209 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java @@ -43,6 +43,10 @@ public class ShardEntryGroup { private List<Shard> shards; + private final long delta; + + private long maxCreatedTime; + private Shard compactionTarget; private Shard rootShard; @@ -51,7 +55,9 @@ public class ShardEntryGroup { /** * The max delta we accept in milliseconds for create time to be considered a member of this group */ - public ShardEntryGroup() { + public ShardEntryGroup( final long delta ) { + Preconditions.checkArgument( delta > 0, "delta must be greater than 0" ); + this.delta = delta; this.shards = new ArrayList<>(); } @@ -101,6 +107,8 @@ public class ShardEntryGroup { private void addShardInternal( final Shard shard ) { shards.add( shard ); + maxCreatedTime = Math.max( maxCreatedTime, shard.getCreatedTime() ); + //we're changing our structure, unset the compaction target compactionTarget = null; } @@ -293,6 +301,7 @@ public class ShardEntryGroup { * We don't have enough shards to compact, ignore */ return getCompactionTarget() != null; + } @@ -346,8 +355,11 @@ public class ShardEntryGroup { @Override public String toString() { return "ShardEntryGroup{" + - "shards=" + shards + - ", compactionTarget=" + compactionTarget + + "compactionTarget=" + compactionTarget + + ", shards=" + shards + + ", delta=" + delta + + ", maxCreatedTime=" + maxCreatedTime + + ", rootShard=" + rootShard + '}'; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java index 8f9e27f..436fd74 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java @@ -74,8 +74,9 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { @Inject public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization, final EdgeColumnFamilies edgeColumnFamilies, - final ShardedEdgeSerialization shardedEdgeSerialization, final TimeService timeService, - final GraphFig graphFig, final ShardGroupCompaction shardGroupCompaction) { + final ShardedEdgeSerialization shardedEdgeSerialization, + final TimeService timeService, final GraphFig graphFig, + final ShardGroupCompaction shardGroupCompaction ) { this.edgeShardSerialization = edgeShardSerialization; this.edgeColumnFamilies = edgeColumnFamilies; this.shardedEdgeSerialization = shardedEdgeSerialization; @@ -100,26 +101,26 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { existingShards = edgeShardSerialization.getShardMetaDataLocal( scope, maxShardId, directedEdgeMeta ); } - /** - * We didn't get anything out of cassandra, so we need to create the minumum shard - */ - if ( existingShards == null || !existingShards.hasNext() ) { - + /** + * We didn't get anything out of cassandra, so we need to create the minumum shard + */ + if ( existingShards == null || !existingShards.hasNext() ) { - final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, Shard.MIN_SHARD, directedEdgeMeta ); - try { - batch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to connect to casandra", e ); - } - existingShards = Collections.singleton( Shard.MIN_SHARD ).iterator(); + final MutationBatch batch = + edgeShardSerialization.writeShardMeta( scope, Shard.MIN_SHARD, directedEdgeMeta ); + try { + batch.execute(); } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to connect to casandra", e ); + } + + existingShards = Collections.singleton( Shard.MIN_SHARD ).iterator(); } - return new ShardEntryGroupIterator( existingShards, shardGroupCompaction, scope, directedEdgeMeta ); + return new ShardEntryGroupIterator( existingShards, graphFig.getShardDeleteDelta(), shardGroupCompaction, scope, directedEdgeMeta ); } @@ -171,7 +172,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { } - final long shardSize = graphFig.getShardSize(); @@ -203,8 +203,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { if ( !edges.hasNext() ) { - logger.debug( - "Tried to allocate a new shard for group {}, but no max value could be found in that row", + logger.debug( "Tried to allocate a new shard for group {}, but no max value could be found in that row", readShards ); return false; } @@ -225,7 +224,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { //we hit a pivot shard, set it since it could be the last one we encounter if ( i % shardSize == 0 ) { marked = edges.next(); - logger.debug("Found an edge {} to split at index {}", marked, i); + logger.debug( "Found an edge {} to split at index {}", marked, i ); } else { edges.next(); @@ -233,8 +232,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { } - - /** * Sanity check in case our counters become severely out of sync with our edge state in cassandra. */ @@ -323,13 +320,13 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { return Collections.<ShardEntryGroup>emptyList().iterator(); } - return new ShardEntryGroupIterator( shards, NO_OP_COMPACTION, scope, directedEdgeMeta ); + return new ShardEntryGroupIterator( shards, graphFig.getShardDeleteDelta(), NO_OP_COMPACTION, scope, directedEdgeMeta ); } /** - * Class that just ignores compaction events, since we're already evaluating the events. A bit of a hack - * that shows we need some refactoring + * Class that just ignores compaction events, since we're already evaluating the events. A bit of a hack that shows + * we need some refactoring */ private final static class NoOpCompaction implements ShardGroupCompaction { @@ -342,6 +339,4 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { return Futures.immediateFuture( AuditResult.NOT_CHECKED ); } } - - } http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java index cbe35b6..3a1c4d8 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java @@ -26,6 +26,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> { private final ShardGroupCompaction shardGroupCompaction; private final PushbackIterator<Shard> sourceIterator; + private final long minDelta; private final ApplicationScope scope; private final DirectedEdgeMeta directedEdgeMeta; @@ -39,7 +40,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> { * @param shardIterator The iterator of all shards. Order is expected to be by the shard index from Long.MAX to * Long.MIN */ - public ShardEntryGroupIterator( final Iterator<Shard> shardIterator, + public ShardEntryGroupIterator( final Iterator<Shard> shardIterator, final long minDelta, final ShardGroupCompaction shardGroupCompaction, final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta ) { @@ -49,6 +50,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> { this.directedEdgeMeta = directedEdgeMeta; this.sourceIterator = new PushbackIterator( shardIterator ); this.shardGroupCompaction = shardGroupCompaction; + this.minDelta = minDelta; } @@ -95,7 +97,7 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> { while ( sourceIterator.hasNext() ) { if ( next == null ) { - next = new ShardEntryGroup( ); + next = new ShardEntryGroup( minDelta); } final Shard shard = sourceIterator.next(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java index f16cc78..2ac67aa 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java @@ -54,8 +54,11 @@ import org.apache.usergrid.persistence.core.util.IdGenerator; import org.apache.usergrid.persistence.graph.guice.TestGraphModule; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation; import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardGroupSearch; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard; import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl; import org.apache.usergrid.persistence.model.entity.Id; import com.codahale.metrics.Meter; @@ -86,9 +89,6 @@ public class GraphManagerShardConsistencyIT { private static final Meter writeMeter = registry.meter( "writeThroughput" ); - private static final Slf4jReporter reporter = - Slf4jReporter.forRegistry( registry ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS ) - .convertDurationsTo( TimeUnit.MILLISECONDS ).build(); private Slf4jReporter reporter; @@ -97,10 +97,6 @@ public class GraphManagerShardConsistencyIT { protected Object originalShardSize; - protected Object originalShardTimeout; - - protected Object originalShardDelta; - protected ListeningExecutorService executor; @@ -131,10 +127,14 @@ public class GraphManagerShardConsistencyIT { @After public void tearDown() { - reporter.stop(); - reporter.report(); + if(reporter != null) { + reporter.stop(); + reporter.report(); + } - executor.shutdownNow(); + if(executor != null) { + executor.shutdownNow(); + } } @@ -373,8 +373,6 @@ public class GraphManagerShardConsistencyIT { Thread.sleep( 2000 ); } - - executor.shutdownNow(); } @@ -484,10 +482,6 @@ public class GraphManagerShardConsistencyIT { final AtomicLong writeCounter = new AtomicLong(); - //min stop time the min delta + 1 cache cycle timeout - final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout(); - - log.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector, numInjectors ); @@ -498,10 +492,9 @@ public class GraphManagerShardConsistencyIT { for ( Injector injector : injectors ) { final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class ); - for ( int i = 0; i < numWorkersPerInjector; i++ ) { Future<Boolean> future = - executor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) ); + executor.submit( new Worker( gmf, generator, workerWriteLimit, writeCounter ) ); futures.add( future ); } @@ -515,7 +508,7 @@ public class GraphManagerShardConsistencyIT { } //now get all our shards - final NodeShardCache cache = getInstance( injectors, NodeShardCache.class ); + final NodeShardGroupSearch nodeShardGroupSearch = getInstance( injectors, NodeShardGroupSearch.class ); final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType ); @@ -531,7 +524,7 @@ public class GraphManagerShardConsistencyIT { final Iterator<ShardEntryGroup> existingShardGroups = - cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta ); + nodeShardGroupSearch.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta ); int shardCount = 0; while ( existingShardGroups.hasNext() ) { @@ -621,7 +614,7 @@ public class GraphManagerShardConsistencyIT { shardCount = 0; //we have to get it from the cache, because this will trigger the compaction process - final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta ); + final Iterator<ShardEntryGroup> groups = nodeShardGroupSearch.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta ); ShardEntryGroup group = null; @@ -759,7 +752,7 @@ public class GraphManagerShardConsistencyIT { /** * Perform the search returning an observable edge */ - Observable<Edge> doSearch( final GraphManager manager ); + Observable<MarkedEdge> doSearch( final GraphManager manager ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java index 45047f1..ee2fb19 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java @@ -82,6 +82,8 @@ public class NodeShardAllocationTest { when( graphFig.getShardSize() ).thenReturn( 20000l ); + when(graphFig.getShardDeleteDelta()).thenReturn( 60000l ); + } @@ -114,7 +116,7 @@ public class NodeShardAllocationTest { final Shard firstShard = new Shard( 0l, 0l, true ); final Shard futureShard = new Shard( 10000l, timeservicetime, false ); - final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( ); + final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(10000l ); shardEntryGroup.addShard( futureShard ); shardEntryGroup.addShard( firstShard ); @@ -160,7 +162,7 @@ public class NodeShardAllocationTest { final Shard futureShard = new Shard( 10000l, timeservicetime, true ); - final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( ); + final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(10000l ); shardEntryGroup.addShard( futureShard ); @@ -209,7 +211,7 @@ public class NodeShardAllocationTest { final Shard firstShard = new Shard( 0l, 0l, true ); - final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( ); + final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(10000l ); shardEntryGroup.addShard( firstShard ); final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType ); @@ -323,7 +325,7 @@ public class NodeShardAllocationTest { final Shard firstShard = new Shard( 0l, 0l, true ); - final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( ); + final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(10000l ); shardEntryGroup.addShard( firstShard ); final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType ); @@ -413,7 +415,7 @@ public class NodeShardAllocationTest { final Shard firstShard = new Shard( 0l, 0l, true ); - final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( ); + final ShardEntryGroup shardEntryGroup = new ShardEntryGroup(10000l ); shardEntryGroup.addShard( firstShard ); final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardGroupSearchTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardGroupSearchTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardGroupSearchTest.java index c9fe4a4..910dde0 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardGroupSearchTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardGroupSearchTest.java @@ -94,7 +94,7 @@ public class NodeShardGroupSearchTest { final Optional max = Optional.absent(); - final ShardEntryGroup group = new ShardEntryGroup(); + final ShardEntryGroup group = new ShardEntryGroup(10000l); group.addShard( new Shard( 0, 0, true ) ); @@ -168,14 +168,14 @@ public class NodeShardGroupSearchTest { /** * Simulate returning all shards */ - final ShardEntryGroup minShardGroup = new ShardEntryGroup(); + final ShardEntryGroup minShardGroup = new ShardEntryGroup(10000l); minShardGroup.addShard( minShard ); - final ShardEntryGroup midShardGroup = new ShardEntryGroup(); + final ShardEntryGroup midShardGroup = new ShardEntryGroup(10000l); midShardGroup.addShard( midShard ); - final ShardEntryGroup maxShardGroup = new ShardEntryGroup(); + final ShardEntryGroup maxShardGroup = new ShardEntryGroup(10000l); maxShardGroup.addShard( maxShard ); @@ -211,7 +211,7 @@ public class NodeShardGroupSearchTest { //use "thenAnswer" so we always return the value, even if it's invoked more than 1 time. .thenAnswer( answer -> new ShardEntryGroupIterator( Collections.singleton( minShard ).iterator(), - NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) ); + 10000l, NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) ); /** @@ -243,7 +243,7 @@ public class NodeShardGroupSearchTest { //use "thenAnswer" so we always return the value, even if it's invoked more than 1 time. .thenAnswer( answer -> new ShardEntryGroupIterator( Arrays.asList( midShard, minShard ).iterator(), - NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) ); + 10000l, NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) ); /** @@ -276,7 +276,7 @@ public class NodeShardGroupSearchTest { //use "thenAnswer" so we always return the value, even if it's invoked more than 1 time. .thenAnswer( answer -> new ShardEntryGroupIterator( Arrays.asList( maxShard, midShard, minShard ).iterator(), - NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) ); + 10000l, NoOpShardCompaction.INSTANCE, scope, directedEdgeMeta ) ); //check getting equal to our min, mid and max http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java index d609d96..d9f8a2b 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java @@ -40,7 +40,7 @@ public class ShardEntryGroupTest { Shard rootShard = new Shard( 0, 0, false ); - ShardEntryGroup shardEntryGroup = new ShardEntryGroup( ); + ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l ); final boolean result = shardEntryGroup.addShard( rootShard ); @@ -62,7 +62,7 @@ public class ShardEntryGroupTest { Shard secondShard = new Shard( 1000, 1001, false ); - ShardEntryGroup shardEntryGroup = new ShardEntryGroup( ); + ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l ); boolean result = shardEntryGroup.addShard( secondShard ); @@ -73,7 +73,6 @@ public class ShardEntryGroupTest { assertTrue( "Shard added", result ); - assertFalse( "First shard cannot be deleted", shardEntryGroup.canBeDeleted( firstShard ) ); assertFalse( "Second shard cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) ); @@ -81,9 +80,6 @@ public class ShardEntryGroupTest { assertFalse( "Duplicate shard id cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) ); assertNull( "Can't compact, no min compacted shard present", shardEntryGroup.getCompactionTarget() ); - - - } @@ -97,7 +93,7 @@ public class ShardEntryGroupTest { Shard secondShard = new Shard( 1000, 1001, false ); - ShardEntryGroup shardEntryGroup = new ShardEntryGroup( ); + ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l ); boolean result = shardEntryGroup.addShard( secondShard ); @@ -123,11 +119,9 @@ public class ShardEntryGroupTest { //we should compact these assertTrue( "Merge should be run", shardEntryGroup.shouldCompact() ); - } - @Test public void lowerTimestampHigherShard() { @@ -139,7 +133,7 @@ public class ShardEntryGroupTest { Shard compactedShard = new Shard( 500, 200, true ); - ShardEntryGroup shardEntryGroup = new ShardEntryGroup( ); + ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l ); boolean result = shardEntryGroup.addShard( firstShard ); @@ -154,19 +148,15 @@ public class ShardEntryGroupTest { assertTrue( "Shard added", result ); + assertFalse( "First shard cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) ); - assertFalse( "First shard cannot be deleted", shardEntryGroup.canBeDeleted( secondShard ) ); - - assertTrue( "Second shard can be deleted", shardEntryGroup.canBeDeleted( firstShard ) ); - - assertEquals( "Can't compact, no min compacted shard present", secondShard, shardEntryGroup.getCompactionTarget() ); - - + assertTrue( "Second shard can be deleted", shardEntryGroup.canBeDeleted( firstShard ) ); + assertEquals( "Can't compact, no min compacted shard present", secondShard, + shardEntryGroup.getCompactionTarget() ); } - @Test public void multipleShardGroups() { @@ -179,7 +169,7 @@ public class ShardEntryGroupTest { Shard compactedShard2 = new Shard( 800, 7000, true ); - ShardEntryGroup shardEntryGroup = new ShardEntryGroup( ); + ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l ); boolean result = shardEntryGroup.addShard( firstShard ); @@ -197,7 +187,7 @@ public class ShardEntryGroupTest { assertFalse( "Shouldn't add since it's compacted", result ); - ShardEntryGroup secondGroup = new ShardEntryGroup( ); + ShardEntryGroup secondGroup = new ShardEntryGroup( 10000l ); result = secondGroup.addShard( compactedShard2 ); @@ -214,7 +204,7 @@ public class ShardEntryGroupTest { Shard compactedShard1 = new Shard( 900, 8000, true ); - ShardEntryGroup shardEntryGroup = new ShardEntryGroup( ); + ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l ); boolean result = shardEntryGroup.addShard( firstShard ); @@ -236,9 +226,7 @@ public class ShardEntryGroupTest { assertEquals( "Same shard for merge target", secondShard, shardEntryGroup.getCompactionTarget() ); //Should return true, we can merge - assertTrue( "Merge cannot be run within min time", - shardEntryGroup.shouldCompact() ); - + assertTrue( "Merge cannot be run within min time", shardEntryGroup.shouldCompact() ); } @@ -255,7 +243,7 @@ public class ShardEntryGroupTest { Shard compactedShard1 = new Shard( 900, 8000, true ); - ShardEntryGroup shardEntryGroup = new ShardEntryGroup( ); + ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l ); boolean result = shardEntryGroup.addShard( firstShard ); @@ -292,7 +280,7 @@ public class ShardEntryGroupTest { Shard compactedShard = new Shard( 900, 8000, true ); - ShardEntryGroup shardEntryGroup = new ShardEntryGroup( ); + ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l ); boolean result = shardEntryGroup.addShard( ignoredProposedShard ); @@ -307,7 +295,7 @@ public class ShardEntryGroupTest { assertTrue( "Shard added", result ); - Collection<Shard> writeShards = shardEntryGroup.getWriteShards(newAllocatedCompactionTarget.getShardIndex() ); + Collection<Shard> writeShards = shardEntryGroup.getWriteShards( newAllocatedCompactionTarget.getShardIndex() ); assertEquals( "Shard size correct", 1, writeShards.size() ); @@ -319,7 +307,6 @@ public class ShardEntryGroupTest { assertEquals( "Shard size correct", 1, writeShards.size() ); assertTrue( "Lowest new shard present", writeShards.contains( compactedShard ) ); - } @@ -332,7 +319,7 @@ public class ShardEntryGroupTest { Shard rootShard = new Shard( 0, 0, false ); - ShardEntryGroup shardEntryGroup = new ShardEntryGroup( ); + ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l ); boolean result = shardEntryGroup.addShard( secondShard ); @@ -357,7 +344,7 @@ public class ShardEntryGroupTest { Shard lowShard = new Shard( 10000, 1000, false ); - ShardEntryGroup shardEntryGroup = new ShardEntryGroup( ); + ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 10000l ); boolean result = shardEntryGroup.addShard( highShard ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java index 6a0e1e5..0ab60a3 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java @@ -59,7 +59,7 @@ public class ShardEntryGroupIteratorTest { final Iterator<Shard> noShards = Collections.<Shard>emptyList().iterator(); //should blow up, our iterator is empty - new ShardEntryGroupIterator( noShards, shardGroupCompaction, scope, directedEdgeMeta ); + new ShardEntryGroupIterator( noShards, 10000l, shardGroupCompaction, scope, directedEdgeMeta ); } @@ -77,7 +77,7 @@ public class ShardEntryGroupIteratorTest { final Iterator<Shard> noShards = Collections.singleton( minShard ).iterator(); ShardEntryGroupIterator entryGroupIterator = - new ShardEntryGroupIterator( noShards, shardGroupCompaction, scope, directedEdgeMeta ); + new ShardEntryGroupIterator( noShards, 10000l, shardGroupCompaction, scope, directedEdgeMeta ); assertTrue( "Root shard always present", entryGroupIterator.hasNext() ); @@ -154,7 +154,7 @@ public class ShardEntryGroupIteratorTest { ShardEntryGroupIterator entryGroupIterator = - new ShardEntryGroupIterator( noShards, shardGroupCompaction, scope, directedEdgeMeta ); + new ShardEntryGroupIterator( noShards, 10000l, shardGroupCompaction, scope, directedEdgeMeta ); assertTrue( "max group present", entryGroupIterator.hasNext() ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/06d34dbc/stack/corepersistence/locks/pom.xml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/locks/pom.xml b/stack/corepersistence/locks/pom.xml index de89053..6f6f77a 100644 --- a/stack/corepersistence/locks/pom.xml +++ b/stack/corepersistence/locks/pom.xml @@ -20,10 +20,10 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> - <artifactId>persistence</artifactId> - <groupId>org.apache.usergrid</groupId> - <version>2.1.0-SNAPSHOT</version> - </parent> + <artifactId>persistence</artifactId> + <groupId>org.apache.usergrid</groupId> + <version>2.1.1-SNAPSHOT</version> + </parent> <modelVersion>4.0.0</modelVersion> <description>The module for handling all distributed locks</description>