Added test to prove issue in graph.  Need to fix version load.

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/13d25941
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/13d25941
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/13d25941

Branch: refs/heads/USERGRID-933
Commit: 13d2594196bd6315af4305e95e8314cbb600ee3e
Parents: 85e1e7c
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Sep 15 13:34:34 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Sep 15 13:35:21 2015 -0600

----------------------------------------------------------------------
 .../service/ConnectionServiceImpl.java          | 26 ++++---
 .../service/ConnectionServiceImplTest.java      | 36 ++++++++--
 stack/core/src/test/resources/log4j.properties  |  3 +-
 .../migration/schema/MigrationManagerFig.java   |  1 +
 .../impl/stage/EdgeDeleteListenerImpl.java      |  4 +-
 .../graph/impl/stage/EdgeDeleteRepairImpl.java  | 61 +++++-----------
 .../persistence/graph/GraphManagerIT.java       | 73 +++++++++++++++++++-
 7 files changed, 136 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
index 41e6f80..d60426c 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
@@ -147,18 +147,18 @@ public class ConnectionServiceImpl implements 
ConnectionService {
         final Observable<ApplicationScope> applicationScopeObservable ) {
 
 
-        final Observable<EntityIdScope> entityIds 
=allEntityIdsObservable.getEntities( applicationScopeObservable );
+        final Observable<EntityIdScope> entityIds = 
allEntityIdsObservable.getEntities( applicationScopeObservable );
         //now we have an observable of entityIds.  Walk each connection type
 
         //get all edge types for connections
-       return  entityIds.flatMap( entityIdScope -> {
+        return entityIds.flatMap( entityIdScope -> {
 
             final ApplicationScope applicationScope = 
entityIdScope.getApplicationScope();
             final Id entityId = entityIdScope.getId();
 
-            final GraphManager gm = 
graphManagerFactory.createEdgeManager(applicationScope );
+            final GraphManager gm = graphManagerFactory.createEdgeManager( 
applicationScope );
 
-           logger.debug( "Checking connections of id {} in application {}", 
entityId, applicationScope );
+            logger.debug( "Checking connections of id {} in application {}", 
entityId, applicationScope );
 
             return gm.getEdgeTypesFromSource(
                 new SimpleSearchEdgeType( entityId, 
CpNamingUtils.EDGE_CONN_PREFIX, Optional.absent() ) )
@@ -176,7 +176,7 @@ public class ConnectionServiceImpl implements 
ConnectionService {
                     return gm.loadEdgesFromSource( searchByEdge );
                 } )
 
-                //now that we have a stream of edges, stream all versions
+                    //now that we have a stream of edges, stream all versions
                 .flatMap( edge -> {
 
                     logger.debug( "Found edge {}, searching for multiple 
versions of edge", edge );
@@ -187,19 +187,17 @@ public class ConnectionServiceImpl implements 
ConnectionService {
                     return gm.loadEdgeVersions( searchByEdge );
                 } )
 
-            //skip the first version since it's the one we want to retain
-            // validate there is only 1 version of it, delete anything > than 
the min
-                .skip( 1 )
-                .flatMap( edgeToDelete -> {
+                    //skip the first version since it's the one we want to 
retain
+                    // validate there is only 1 version of it, delete anything 
> than the min
+                .skip( 1 ).flatMap( edgeToDelete -> {
 
                     logger.debug( "Deleting edge {}", edgeToDelete );
 
                     //mark the edge and ignore the cleanup result
                     return gm.markEdge( edgeToDelete )
-                             //delete the edge
-                             .flatMap( edge -> gm.deleteEdge( edgeToDelete ));
-                } )
-                .map( deletedEdge -> new ConnectionScope( applicationScope, 
deletedEdge ) ) ;
-        });
+                        //delete the edge
+                        .flatMap( edge -> gm.deleteEdge( edgeToDelete ) );
+                } ).map( deletedEdge -> new ConnectionScope( applicationScope, 
deletedEdge ) );
+        } );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
index e03a9b3..152721a 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
@@ -23,6 +23,8 @@ import java.util.List;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.TestCoreModule;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@ -53,6 +55,9 @@ import static org.junit.Assert.assertNotNull;
 @UseModules( TestCoreModule.class )
 public class ConnectionServiceImplTest {
 
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ConnectionServiceImplTest.class);
+
     @Inject
     private GraphManagerFactory graphManagerFactory;
 
@@ -158,19 +163,23 @@ public class ConnectionServiceImplTest {
 
         final Edge written1 = gm.writeEdge( connection1 ).toBlocking().last();
 
+        logger.info( "Wrote edge 1 with edge {}", written1 );
+
 
         //write the second
         final Edge connection2 = CpNamingUtils.createConnectionEdge( source, 
connectionType, target );
 
         final Edge written2 = gm.writeEdge( connection2 ).toBlocking().last();
 
+        logger.info( "Wrote edge 2 with edge {}", written2 );
+
 
         //write the 3rd
         final Edge connection3 = CpNamingUtils.createConnectionEdge( source, 
connectionType, target );
 
         final Edge written3 = gm.writeEdge( connection3 ).toBlocking().last();
 
-
+        logger.info( "Wrote edge 3 with edge {}", written3 );
 
 
         //now run the cleanup
@@ -178,13 +187,16 @@ public class ConnectionServiceImplTest {
         final List<ConnectionScope> deletedConnections =
             connectionService.deDupeConnections( Observable.just( 
applicationScope ) ).toList().toBlocking().last();
 
-        assertEquals( "2 edges deleted", 2, deletedConnections.size() );
+//        assertEquals( "2 edges deleted", 2, deletedConnections.size() );
 
         //check our oldest was deleted first
 
-        assertEquals(written2, deletedConnections.get( 0 ));
 
-        assertEquals(written3, deletedConnections.get( 1 ));
+        assertEdgeData( written2, deletedConnections.get( 0 ).getEdge() );
+
+        assertEdgeData( written3, deletedConnections.get( 1 ).getEdge() );
+
+        assertEquals( "2 edges deleted", 2, deletedConnections.size() );
 
 
 
@@ -201,4 +213,20 @@ public class ConnectionServiceImplTest {
 
         assertEquals( written1, edges.get( 0 ) );
     }
+
+
+    /**
+     * Compares edges based on their sourceId, type, targetId and timestamp. 
It ignores the deleted flag
+     * @param expected
+     * @param asserted
+     */
+    private void assertEdgeData(final Edge expected, final Edge asserted){
+        assertEquals("SourceId the same", expected.getSourceNode(), 
expected.getTargetNode());
+        assertEquals("TargetId the same", expected.getTargetNode(), 
expected.getTargetNode());
+
+        assertEquals("Type the same", expected.getType(), expected.getType());
+
+        assertEquals("Timestamp the same", expected.getTimestamp(), 
expected.getTimestamp());
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/log4j.properties 
b/stack/core/src/test/resources/log4j.properties
index dd29671..93951fe 100644
--- a/stack/core/src/test/resources/log4j.properties
+++ b/stack/core/src/test/resources/log4j.properties
@@ -47,7 +47,8 @@ 
log4j.logger.org.apache.usergrid.persistence.PerformanceEntityRebuildIndexTest=D
 log4j.logger.org.apache.usergrid.corepersistence.migration=WARN
 
 #Debug our queries
-log4j.logger.org.apache.usergrid.persistence.index.impl.EsApplicationEntityIndexImpl=DEBUG
+#log4j.logger.org.apache.usergrid.persistence.index.impl.EsApplicationEntityIndexImpl=DEBUG
+log4j.logger.org.apache.usergrid.corepersistence.service.ConnectionServiceImpl=DEBUG
 
 #log4j.logger.org.apache.usergrid.persistence.index.impl=DEBUG
 #log4j.logger.org.apache.usergrid.corepersistence.CpSetup=INFO

http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java
index b95338a..d8f3d1f 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java
@@ -35,6 +35,7 @@ public interface MigrationManagerFig extends GuicyFig {
     String getStrategyClass();
 
     @Key( "collections.keyspace.strategy.options" )
+    @Default("replication_factor:1")
     String getStrategyOptions();
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java
index 22a5ad0..fa289bd 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java
@@ -30,6 +30,7 @@ import com.google.inject.Singleton;
 import rx.Observable;
 import rx.functions.Func1;
 import rx.functions.Func2;
+import rx.observables.MathObservable;
 
 
 /**
@@ -69,8 +70,7 @@ public class EdgeDeleteListenerImpl implements 
EdgeDeleteListener {
                                    Observable<Integer> targetDelete = 
edgeMetaRepair
                                            .repairTargets( scope, 
edge.getTargetNode(), edge.getType(), maxTimestamp );
 
-                                   return Observable.zip( sourceDelete, 
targetDelete,
-                                       ( sourceCount, targetCount ) -> 
sourceCount + targetCount );
+                                   return MathObservable.sumInteger( 
Observable.merge( sourceDelete, targetDelete ) );
                                } );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
index 9b84978..643dcc1 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -79,52 +79,25 @@ public class EdgeDeleteRepairImpl implements 
EdgeDeleteRepair {
 
 
         //merge source and target then deal with the distinct values
-        return Observable.just( edge ).flatMap( new Func1<MarkedEdge, 
Observable<? extends MarkedEdge>>() {
-            @Override
-            public Observable<? extends MarkedEdge> call( final MarkedEdge 
edge ) {
-
-                return getEdgeVersions( scope, edge, storageSerialization 
).take( 1 )
-                        .doOnNext( new Action1<MarkedEdge>() {
-                            @Override
-                            public void call( final MarkedEdge markedEdge ) {
-                                //it's still in the same state as it was when 
we queued it. Remove it
-                                if ( edge.equals( markedEdge ) ) {
-                                    LOG.info( "Removing edge {} ", edge );
-
-                                    //remove from the commit log
-
-
-                                    //remove from storage
-                                    try {
-                                        storageSerialization.deleteEdge( 
scope, edge, timestamp ).execute();
-                                    }
-                                    catch ( ConnectionException e ) {
-                                        throw new RuntimeException( "Unable to 
connect to casandra", e );
-                                    }
-                                }
-                            }
-                        } );
-            }
-        } );
+        return Observable.just( edge )
+                .doOnNext( markedEdge -> {
+                    //it's still in the same state as it was when we queued 
it. Remove it
+                        LOG.info( "Removing edge {} ", markedEdge );
+
+                        //remove from the commit log
+
+
+                        //remove from storage
+                        try {
+                            storageSerialization.deleteEdge( scope, 
markedEdge, timestamp ).execute();
+                        }
+                        catch ( ConnectionException e ) {
+                            throw new RuntimeException( "Unable to connect to 
casandra", e );
+                        }
+                    }
+              );
     }
 
 
-    /**
-     * Get all edge versions <= the specified max from the source
-     */
-    private Observable<MarkedEdge> getEdgeVersions( final ApplicationScope 
scope, final Edge edge,
-                                                    final EdgeSerialization 
serialization ) {
 
-        return Observable.create( new ObservableIterator<MarkedEdge>( 
"edgeVersions" ) {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-
-                final SimpleSearchByEdge search =
-                        new SimpleSearchByEdge( edge.getSourceNode(), 
edge.getType(), edge.getTargetNode(),
-                                edge.getTimestamp(), 
SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() );
-
-                return serialization.getEdgeVersions( scope, search );
-            }
-        } );
-    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
index d416b1d..4b2ffc8 100644
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
+++ 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
@@ -20,9 +20,9 @@ package org.apache.usergrid.persistence.graph;
 
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.TimeoutException;
 
-import com.google.common.base.Optional;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -35,11 +35,14 @@ import org.apache.usergrid.persistence.core.test.ITRunner;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.core.util.IdGenerator;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
+import com.google.common.base.Optional;
 import com.google.inject.Inject;
 
 import rx.Observable;
@@ -836,7 +839,7 @@ public class GraphManagerIT {
         //now load the next page
 
         //tests that even if a prefix is specified, the last takes precedence
-        edgeTypes = new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), 
null, Optional.fromNullable("test") );
+        edgeTypes = new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), 
null, Optional.fromNullable( "test" ) );
 
         edges = gm.getEdgeTypesFromSource( edgeTypes );
 
@@ -914,7 +917,7 @@ public class GraphManagerIT {
 
         //now load the next page
 
-        edgeTypes = new SimpleSearchEdgeType( testTargetEdge2.getTargetNode(), 
null,  Optional.fromNullable("test") );
+        edgeTypes = new SimpleSearchEdgeType( testTargetEdge2.getTargetNode(), 
null, Optional.fromNullable( "test" ) );
 
         edges = gm.getEdgeTypesToTarget( edgeTypes );
 
@@ -2321,6 +2324,70 @@ public class GraphManagerIT {
 
         em.markEdge( null );
     }
+
+
+    @Test
+    public void testReadMultipleVersionOrder() {
+
+        GraphManager gm = emf.createEdgeManager( scope );
+
+        final Id sourceId = createId( "source" );
+
+        final Id target = createId( "target" );
+
+
+        //write 3 edges with 3 different timestamp
+        final Edge edge1 = createEdge( sourceId, "test", target, 1, false );
+
+        gm.writeEdge( edge1 ).toBlocking().last();
+
+        final Edge edge2 = createEdge( sourceId, "test", target, 2, false );
+
+        gm.writeEdge( edge2 ).toBlocking().last();
+
+        final Edge edge3 = createEdge( sourceId, "test", target, 3, false );
+
+        gm.writeEdge( edge3 ).toBlocking().last();
+
+        //now test retrieving it
+
+
+        final SearchByEdge searchDescending =
+            new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), 
edge1.getTargetNode(),Long.MAX_VALUE,
+                SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()   );
+
+        final Observable<Edge> edgesDescending = gm.loadEdgeVersions( 
searchDescending );
+
+        //search descending
+        final List<Edge> descending = 
edgesDescending.toList().toBlocking().single();
+
+        assertEquals( "Correct size returned", 3, descending.size() );
+
+        assertEquals( "Correct edges returned", edge3, descending.get( 0 ) );
+
+        assertEquals( "Correct edges returned", edge2, descending.get( 1 ) );
+
+        assertEquals( "Correct edges returned", edge1, descending.get( 2 ) );
+
+
+        //now search ascending
+
+        final SearchByEdge searchAscending =
+                    new SimpleSearchByEdge( edge1.getSourceNode(), 
edge1.getType(), edge1.getTargetNode(),Long.MAX_VALUE,
+                        SearchByEdgeType.Order.ASCENDING, 
Optional.<Edge>absent()   );
+
+        Observable<Edge> edgesAscending = gm.loadEdgeVersions( searchAscending 
);
+
+        List<Edge> ascending = edgesAscending.toList().toBlocking().single();
+
+        assertEquals( "Correct size returned", 3, ascending.size() );
+
+        assertEquals( "Correct edges returned", edge1, ascending.get( 0 ) );
+
+        assertEquals( "Correct edges returned", edge2, ascending.get( 1 ) );
+
+        assertEquals( "Correct edges returned", edge3, ascending.get( 2 ) );
+    }
 }
 
 

Reply via email to