Added test to prove issue in graph. Need to fix version load.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/13d25941 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/13d25941 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/13d25941 Branch: refs/heads/USERGRID-933 Commit: 13d2594196bd6315af4305e95e8314cbb600ee3e Parents: 85e1e7c Author: Todd Nine <tn...@apigee.com> Authored: Tue Sep 15 13:34:34 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Tue Sep 15 13:35:21 2015 -0600 ---------------------------------------------------------------------- .../service/ConnectionServiceImpl.java | 26 ++++--- .../service/ConnectionServiceImplTest.java | 36 ++++++++-- stack/core/src/test/resources/log4j.properties | 3 +- .../migration/schema/MigrationManagerFig.java | 1 + .../impl/stage/EdgeDeleteListenerImpl.java | 4 +- .../graph/impl/stage/EdgeDeleteRepairImpl.java | 61 +++++----------- .../persistence/graph/GraphManagerIT.java | 73 +++++++++++++++++++- 7 files changed, 136 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java index 41e6f80..d60426c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java @@ -147,18 +147,18 @@ public class ConnectionServiceImpl implements ConnectionService { final Observable<ApplicationScope> applicationScopeObservable ) { - final Observable<EntityIdScope> entityIds =allEntityIdsObservable.getEntities( applicationScopeObservable ); + final Observable<EntityIdScope> entityIds = allEntityIdsObservable.getEntities( applicationScopeObservable ); //now we have an observable of entityIds. Walk each connection type //get all edge types for connections - return entityIds.flatMap( entityIdScope -> { + return entityIds.flatMap( entityIdScope -> { final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); final Id entityId = entityIdScope.getId(); - final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope ); + final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); - logger.debug( "Checking connections of id {} in application {}", entityId, applicationScope ); + logger.debug( "Checking connections of id {} in application {}", entityId, applicationScope ); return gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( entityId, CpNamingUtils.EDGE_CONN_PREFIX, Optional.absent() ) ) @@ -176,7 +176,7 @@ public class ConnectionServiceImpl implements ConnectionService { return gm.loadEdgesFromSource( searchByEdge ); } ) - //now that we have a stream of edges, stream all versions + //now that we have a stream of edges, stream all versions .flatMap( edge -> { logger.debug( "Found edge {}, searching for multiple versions of edge", edge ); @@ -187,19 +187,17 @@ public class ConnectionServiceImpl implements ConnectionService { return gm.loadEdgeVersions( searchByEdge ); } ) - //skip the first version since it's the one we want to retain - // validate there is only 1 version of it, delete anything > than the min - .skip( 1 ) - .flatMap( edgeToDelete -> { + //skip the first version since it's the one we want to retain + // validate there is only 1 version of it, delete anything > than the min + .skip( 1 ).flatMap( edgeToDelete -> { logger.debug( "Deleting edge {}", edgeToDelete ); //mark the edge and ignore the cleanup result return gm.markEdge( edgeToDelete ) - //delete the edge - .flatMap( edge -> gm.deleteEdge( edgeToDelete )); - } ) - .map( deletedEdge -> new ConnectionScope( applicationScope, deletedEdge ) ) ; - }); + //delete the edge + .flatMap( edge -> gm.deleteEdge( edgeToDelete ) ); + } ).map( deletedEdge -> new ConnectionScope( applicationScope, deletedEdge ) ); + } ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java index e03a9b3..152721a 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java @@ -23,6 +23,8 @@ import java.util.List; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.usergrid.corepersistence.TestCoreModule; import org.apache.usergrid.corepersistence.util.CpNamingUtils; @@ -53,6 +55,9 @@ import static org.junit.Assert.assertNotNull; @UseModules( TestCoreModule.class ) public class ConnectionServiceImplTest { + + private static final Logger logger = LoggerFactory.getLogger(ConnectionServiceImplTest.class); + @Inject private GraphManagerFactory graphManagerFactory; @@ -158,19 +163,23 @@ public class ConnectionServiceImplTest { final Edge written1 = gm.writeEdge( connection1 ).toBlocking().last(); + logger.info( "Wrote edge 1 with edge {}", written1 ); + //write the second final Edge connection2 = CpNamingUtils.createConnectionEdge( source, connectionType, target ); final Edge written2 = gm.writeEdge( connection2 ).toBlocking().last(); + logger.info( "Wrote edge 2 with edge {}", written2 ); + //write the 3rd final Edge connection3 = CpNamingUtils.createConnectionEdge( source, connectionType, target ); final Edge written3 = gm.writeEdge( connection3 ).toBlocking().last(); - + logger.info( "Wrote edge 3 with edge {}", written3 ); //now run the cleanup @@ -178,13 +187,16 @@ public class ConnectionServiceImplTest { final List<ConnectionScope> deletedConnections = connectionService.deDupeConnections( Observable.just( applicationScope ) ).toList().toBlocking().last(); - assertEquals( "2 edges deleted", 2, deletedConnections.size() ); +// assertEquals( "2 edges deleted", 2, deletedConnections.size() ); //check our oldest was deleted first - assertEquals(written2, deletedConnections.get( 0 )); - assertEquals(written3, deletedConnections.get( 1 )); + assertEdgeData( written2, deletedConnections.get( 0 ).getEdge() ); + + assertEdgeData( written3, deletedConnections.get( 1 ).getEdge() ); + + assertEquals( "2 edges deleted", 2, deletedConnections.size() ); @@ -201,4 +213,20 @@ public class ConnectionServiceImplTest { assertEquals( written1, edges.get( 0 ) ); } + + + /** + * Compares edges based on their sourceId, type, targetId and timestamp. It ignores the deleted flag + * @param expected + * @param asserted + */ + private void assertEdgeData(final Edge expected, final Edge asserted){ + assertEquals("SourceId the same", expected.getSourceNode(), expected.getTargetNode()); + assertEquals("TargetId the same", expected.getTargetNode(), expected.getTargetNode()); + + assertEquals("Type the same", expected.getType(), expected.getType()); + + assertEquals("Timestamp the same", expected.getTimestamp(), expected.getTimestamp()); + + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/core/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/stack/core/src/test/resources/log4j.properties b/stack/core/src/test/resources/log4j.properties index dd29671..93951fe 100644 --- a/stack/core/src/test/resources/log4j.properties +++ b/stack/core/src/test/resources/log4j.properties @@ -47,7 +47,8 @@ log4j.logger.org.apache.usergrid.persistence.PerformanceEntityRebuildIndexTest=D log4j.logger.org.apache.usergrid.corepersistence.migration=WARN #Debug our queries -log4j.logger.org.apache.usergrid.persistence.index.impl.EsApplicationEntityIndexImpl=DEBUG +#log4j.logger.org.apache.usergrid.persistence.index.impl.EsApplicationEntityIndexImpl=DEBUG +log4j.logger.org.apache.usergrid.corepersistence.service.ConnectionServiceImpl=DEBUG #log4j.logger.org.apache.usergrid.persistence.index.impl=DEBUG #log4j.logger.org.apache.usergrid.corepersistence.CpSetup=INFO http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java index b95338a..d8f3d1f 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java @@ -35,6 +35,7 @@ public interface MigrationManagerFig extends GuicyFig { String getStrategyClass(); @Key( "collections.keyspace.strategy.options" ) + @Default("replication_factor:1") String getStrategyOptions(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java index 22a5ad0..fa289bd 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java @@ -30,6 +30,7 @@ import com.google.inject.Singleton; import rx.Observable; import rx.functions.Func1; import rx.functions.Func2; +import rx.observables.MathObservable; /** @@ -69,8 +70,7 @@ public class EdgeDeleteListenerImpl implements EdgeDeleteListener { Observable<Integer> targetDelete = edgeMetaRepair .repairTargets( scope, edge.getTargetNode(), edge.getType(), maxTimestamp ); - return Observable.zip( sourceDelete, targetDelete, - ( sourceCount, targetCount ) -> sourceCount + targetCount ); + return MathObservable.sumInteger( Observable.merge( sourceDelete, targetDelete ) ); } ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java index 9b84978..643dcc1 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java @@ -79,52 +79,25 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair { //merge source and target then deal with the distinct values - return Observable.just( edge ).flatMap( new Func1<MarkedEdge, Observable<? extends MarkedEdge>>() { - @Override - public Observable<? extends MarkedEdge> call( final MarkedEdge edge ) { - - return getEdgeVersions( scope, edge, storageSerialization ).take( 1 ) - .doOnNext( new Action1<MarkedEdge>() { - @Override - public void call( final MarkedEdge markedEdge ) { - //it's still in the same state as it was when we queued it. Remove it - if ( edge.equals( markedEdge ) ) { - LOG.info( "Removing edge {} ", edge ); - - //remove from the commit log - - - //remove from storage - try { - storageSerialization.deleteEdge( scope, edge, timestamp ).execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to connect to casandra", e ); - } - } - } - } ); - } - } ); + return Observable.just( edge ) + .doOnNext( markedEdge -> { + //it's still in the same state as it was when we queued it. Remove it + LOG.info( "Removing edge {} ", markedEdge ); + + //remove from the commit log + + + //remove from storage + try { + storageSerialization.deleteEdge( scope, markedEdge, timestamp ).execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to connect to casandra", e ); + } + } + ); } - /** - * Get all edge versions <= the specified max from the source - */ - private Observable<MarkedEdge> getEdgeVersions( final ApplicationScope scope, final Edge edge, - final EdgeSerialization serialization ) { - return Observable.create( new ObservableIterator<MarkedEdge>( "edgeVersions" ) { - @Override - protected Iterator<MarkedEdge> getIterator() { - - final SimpleSearchByEdge search = - new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), - edge.getTimestamp(), SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ); - - return serialization.getEdgeVersions( scope, search ); - } - } ); - } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java index d416b1d..4b2ffc8 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java @@ -20,9 +20,9 @@ package org.apache.usergrid.persistence.graph; import java.util.Iterator; +import java.util.List; import java.util.concurrent.TimeoutException; -import com.google.common.base.Optional; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -35,11 +35,14 @@ import org.apache.usergrid.persistence.core.test.ITRunner; import org.apache.usergrid.persistence.core.test.UseModules; import org.apache.usergrid.persistence.core.util.IdGenerator; import org.apache.usergrid.persistence.graph.guice.TestGraphModule; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; +import com.google.common.base.Optional; import com.google.inject.Inject; import rx.Observable; @@ -836,7 +839,7 @@ public class GraphManagerIT { //now load the next page //tests that even if a prefix is specified, the last takes precedence - edgeTypes = new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null, Optional.fromNullable("test") ); + edgeTypes = new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null, Optional.fromNullable( "test" ) ); edges = gm.getEdgeTypesFromSource( edgeTypes ); @@ -914,7 +917,7 @@ public class GraphManagerIT { //now load the next page - edgeTypes = new SimpleSearchEdgeType( testTargetEdge2.getTargetNode(), null, Optional.fromNullable("test") ); + edgeTypes = new SimpleSearchEdgeType( testTargetEdge2.getTargetNode(), null, Optional.fromNullable( "test" ) ); edges = gm.getEdgeTypesToTarget( edgeTypes ); @@ -2321,6 +2324,70 @@ public class GraphManagerIT { em.markEdge( null ); } + + + @Test + public void testReadMultipleVersionOrder() { + + GraphManager gm = emf.createEdgeManager( scope ); + + final Id sourceId = createId( "source" ); + + final Id target = createId( "target" ); + + + //write 3 edges with 3 different timestamp + final Edge edge1 = createEdge( sourceId, "test", target, 1, false ); + + gm.writeEdge( edge1 ).toBlocking().last(); + + final Edge edge2 = createEdge( sourceId, "test", target, 2, false ); + + gm.writeEdge( edge2 ).toBlocking().last(); + + final Edge edge3 = createEdge( sourceId, "test", target, 3, false ); + + gm.writeEdge( edge3 ).toBlocking().last(); + + //now test retrieving it + + + final SearchByEdge searchDescending = + new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(),Long.MAX_VALUE, + SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ); + + final Observable<Edge> edgesDescending = gm.loadEdgeVersions( searchDescending ); + + //search descending + final List<Edge> descending = edgesDescending.toList().toBlocking().single(); + + assertEquals( "Correct size returned", 3, descending.size() ); + + assertEquals( "Correct edges returned", edge3, descending.get( 0 ) ); + + assertEquals( "Correct edges returned", edge2, descending.get( 1 ) ); + + assertEquals( "Correct edges returned", edge1, descending.get( 2 ) ); + + + //now search ascending + + final SearchByEdge searchAscending = + new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(),Long.MAX_VALUE, + SearchByEdgeType.Order.ASCENDING, Optional.<Edge>absent() ); + + Observable<Edge> edgesAscending = gm.loadEdgeVersions( searchAscending ); + + List<Edge> ascending = edgesAscending.toList().toBlocking().single(); + + assertEquals( "Correct size returned", 3, ascending.size() ); + + assertEquals( "Correct edges returned", edge1, ascending.get( 0 ) ); + + assertEquals( "Correct edges returned", edge2, ascending.get( 1 ) ); + + assertEquals( "Correct edges returned", edge3, ascending.get( 2 ) ); + } }